データ基盤 データエンジニアリング dbt

astronomer-cosmosでdbtモデルの実行条件を柔軟に制御する

データ基盤 データエンジニアリング dbt

技術部データ基盤チームの@tosh2230です。この記事では、astronomer-cosmosdbtのモデル実行条件をタグで柔軟に制御する方法についてご紹介します。

記事執筆時点での利用バージョンは下記のとおりです。

  • astronomer-cosmos: 1.3.2
  • dbt-bigquery: 1.5.4
  • Apache Airflow: 2.6.3
  • Cloud Composer: 2.4.6
  1. 結論
  2. astronomer-cosmos
    1. Airflowのコンポーネントでdbtモデルを管理
    2. dbt実行環境の作成
    3. dbt testの自動実行
  3. Cosmos運用開始にあたっての課題
    1. モデルごとに適切な実行頻度が異なる
    2. 複数のモデルオーナーがいる
  4. 解決手段
    1. モデル実行条件の制御
    2. 複数のタグをAND条件で指定するにはParsing methodを変更する
    3. CI/CDでのmanifest.jsonの生成
  5. まとめ

結論

  • astronomer-cosmosでdbtモデルの実行条件を柔軟に制御するのにタグを使いました
  • DbtTaskGroupをモデルオーナーや実行周期ごとに作っています
  • RenderConfig.selectでタグをAND条件で指定するためにはparsing methodの設定変更が必要です

astronomer-cosmos

astronomer-cosmos(以下、Cosmos)は、transformツールであるdbtをApache Airflow(以下、Airflow)で使いやすくするためのPythonパッケージです。 AirflowのマネージドサービスAstroを運営している、Astronomerが開発しています。ライセンスはApache-2.0 licenseです。

詳細な機能はドキュメントを参照いただくとして、ここではcosmosの代表的な機能をいくつかご紹介します。

  • Airflowのコンポーネントでdbtモデルを管理
  • dbt実行環境の作成
  • testの自動実行

Airflowのコンポーネントでdbtモデルを管理

Cosmosはdbtモデルの依存関係をパースしてAirflow DAGへ変換します。これにより、DAGやtask, task groupといったAirflowのコンポーネントでdbtモデルを表現できます。

dbtモデルを変換するには、DbtDagクラスもしくはDbtTaskGroupクラスを使います。両クラスはそれぞれDAGクラス、TaskGroupクラスを継承しています。dbt固有の設定は主に下記の4つです。

dbtモデルがAirflowコンポーネントに置き換わることで、Airflowの資産を活用できます。例えば、モデル実行で必要となる接続情報はAirflow connectionを使えます。Airflow DAGの監視・通知機能を実装している場合は、それをdbtで活かすこともできます。

dbt実行環境の作成

dbtを運用するうえでの検討事項の一つに「どこで実行するか?」があります。Cosmosを使うと、Airflowにおいて複数の方法で実行する環境を提供してくれます。

  • local
  • virtualenv
  • docker
  • kubernetes
  • azure_container_instance

前項で触れたExecutionConfigで設定できます。詳細についてはExecution Modesをご参照ください。

ちなみにペパボでは、AirflowをCloud Composerやオンプレミスに構築したKubernetesで運用しているため、共通して構築できるvirtualenvを指定しています。

dbt testの自動実行

Cosmosはdbt runに続けてdbt testを実行するようにDAGをつくります。モデルに対するテストを追加すると自動的にDAGへ反映されますので、追加忘れを防げて便利です。

Cosmos運用開始にあたっての課題

これまで見てきたように、Cosmosは既にAirflowを使っている場合に有力な手段となり得ます。しかしながら、運用を始めるにあたって解決したい課題が2つありました。

モデルごとに適切な実行頻度が異なる

dbtを実行するDAGは、1時間置きに実行するスケジュールに設定しています。そのDAGですべてのモデルを実行すると、データウェアハウスであるGoogle BigQueryへの負荷が心配になります。モデルの中には更新頻度は日次で十分なものがあるので、指定した時刻以外はスキップしたいというニーズがありました。

複数のモデルオーナーがいる

事業横断でデータ分析ができるように、ペパボで運営している事業すべてを単一のdbt projectで管理する決定をしました。これはつまり、モデルオーナーが複数いることになります。 事業ごとの適切な権限管理やクエリ実行の傾向把握を目的に、モデルごとに使用するGoogle Cloudのサービスアカウントを切り替えることが求められました。

解決手段

これら2つの課題を解決するために、DbtTaskGroupとタグを組み合わせて使いました。

モデル実行条件の制御

下記のコードは、モデル実行頻度やモデルオーナーごとにDbtTaskGroupを作る例です。

MODEL_OWNER = "marketing"
dbt_hourly = DbtTaskGroup(
    group_id=f"dbt_hourly_{MODEL_OWNER}",
    project_config=project_config,
    profile_config=get_profile_config(model_owner=MODEL_OWNER),
    execution_config=execution_config,
    render_config=RenderConfig(
        load_method=LoadMode.DBT_MANIFEST,
        select=[f"tag:hourly,tag:{MODEL_OWNER}"],
    ),
    operator_args=operator_args,
)

RenderConfig.selectで"hourly"と"marketing"のタグがついているモデルをDbtTaskGroupの対象にしています。

models:
  - name: marketing__sample_model
    config:
      materialized: view
      tags:
        - hourly
        - marketing

また、モデルオーナーごとにサービスアカウントを切り替えるため、ProfileConfigを作る関数を以下のように実装しています。

def get_profile_config(model_owner: str):
    return ProfileConfig(
        profile_name=model_owner,
        target_name=os.environ["ENV"],
        profile_mapping=GoogleCloudServiceAccountFileProfileMapping(
            conn_id=get_google_cloud_conn_id(owner=model_owner),
            profile_args={
                "project": DEFAULT_PROJECT,
                "dataset": DEFAULT_DATASET,
                "keyfile": get_service_account_keyfile(owner=model_owner),
                "priority": "batch",
            },
        ),
    )

実行タイミングを毎日9時にしたいモデルができた場合は、たとえばtag: daily_0900のように指定したDbtTaskGroupをつくります。そしてBranchPythonOperatorを使って、logical dateに応じてスキップするか否かを判定します1

複数のタグをAND条件で指定するにはParsing methodを変更する

複数のタグをAND条件で指定できるか否かは、RenderConfig.load_methodの設定に依存します。Cosmosのドキュメントにならい、以下の文章ではこの設定をparsing methodと呼称します。

Parsing methodのデフォルトはautomaticですが、今回の環境では最終的にcustomが選ばれるためcosmos独自のdbt parserがモデルをパースします。しかしcustomの場合は、複数のタグをAND条件で指定することはできないという既知の問題があります。

そこで、Parsing methoddbt_manifestに変更しました2。このパース方法はdbt compileで作られるmanifest.jsonファイルをパース対象にします。

CI/CDでのmanifest.jsonの生成

manifest.jsonファイルは、デフォルト設定ではtarget/manifest.jsonに作成されます。Airflow DAGを組み立てるうえで必要になりますので、 CI/CDを行っているGitHub Actionsジョブの中でmanifest.jsonファイルを作成して、DAG bagへ追加します。既存のデプロイ処理に下記のようなステップを追加しました。

- name: Update dbt manifest.json
  working-directory: dags/dbt
  shell: bash
  run: |
    diff_seeds=$(git diff --name-only --diff-filter=ADR origin/main...HEAD | grep 'dags/dbt/seeds/' || true)
    diff_others=$(git diff --name-only --diff-filter=ADMR origin/main...HEAD | grep 'dags/dbt/' | grep -v 'dags/dbt/seeds/' || true)
    if [ -n "$diff_seeds" ] || [ -n "$diff_others" ]; then
      mkdir /github/home/.dbt
      cp profiles_ci.yml /github/home/.dbt/profiles.yml
      poetry run dbt deps
      poetry run dbt compile
    fi

まとめ

Cosmosでdbtモデルの実行条件を柔軟に制御する方法として、DbtTaskGroupとタグを組み合わる例をご紹介しました。Cosmosは一度整備するとdbtモデルの実装に集中できる環境をつくれますので、Airflowを運用しているdbtユーザーにおすすめのパッケージです。


  1. 実行タイミングごとにDAGを分ける選択肢もあると思いますが、いまのところのパターンが少ないので1つのDAGで実装しています。 

  2. Parsing methodがdbt_manifestである場合、ノード名がaliasになってしまう問題がありました。これはcosmos v1.2.5で修正されていますので、必要に応じてバージョンアップするとよいと思います。