技術部データ基盤チームの@tosh2230です。この記事では、astronomer-cosmosでdbtのモデル実行条件をタグで柔軟に制御する方法についてご紹介します。
記事執筆時点での利用バージョンは下記のとおりです。
- astronomer-cosmos: 1.3.2
- dbt-bigquery: 1.5.4
- Apache Airflow: 2.6.3
- Cloud Composer: 2.4.6
結論
- astronomer-cosmosでdbtモデルの実行条件を柔軟に制御するのにタグを使いました
- DbtTaskGroupをモデルオーナーや実行周期ごとに作っています
- RenderConfig.selectでタグをAND条件で指定するためにはparsing methodの設定変更が必要です
astronomer-cosmos
astronomer-cosmos(以下、Cosmos)は、transformツールであるdbtをApache Airflow(以下、Airflow)で使いやすくするためのPythonパッケージです。
AirflowのマネージドサービスAstroを運営している、Astronomerが開発しています。ライセンスはApache-2.0 license
です。
詳細な機能はドキュメントを参照いただくとして、ここではcosmosの代表的な機能をいくつかご紹介します。
- Airflowのコンポーネントでdbtモデルを管理
- dbt実行環境の作成
- testの自動実行
Airflowのコンポーネントでdbtモデルを管理
Cosmosはdbtモデルの依存関係をパースしてAirflow DAGへ変換します。これにより、DAGやtask, task groupといったAirflowのコンポーネントでdbtモデルを表現できます。
dbtモデルを変換するには、DbtDagクラスもしくはDbtTaskGroupクラスを使います。両クラスはそれぞれDAGクラス、TaskGroupクラスを継承しています。dbt固有の設定は主に下記の4つです。
- ProjectConfig: dbt projectに関する設定
- ProfileConfig: dbt profileに関する設定
- ExecutionConfig: Airflowでのdbt projectの実行に関する設定
- RenderConfig: dbt projectからAirflow DAG,Task Groupへの変換に関する設定
dbtモデルがAirflowコンポーネントに置き換わることで、Airflowの資産を活用できます。例えば、モデル実行で必要となる接続情報はAirflow connectionを使えます。Airflow DAGの監視・通知機能を実装している場合は、それをdbtで活かすこともできます。
dbt実行環境の作成
dbtを運用するうえでの検討事項の一つに「どこで実行するか?」があります。Cosmosを使うと、Airflowにおいて複数の方法で実行する環境を提供してくれます。
- local
- virtualenv
- docker
- kubernetes
- azure_container_instance
前項で触れたExecutionConfigで設定できます。詳細についてはExecution Modesをご参照ください。
ちなみにペパボでは、AirflowをCloud Composerやオンプレミスに構築したKubernetesで運用しているため、共通して構築できるvirtualenv
を指定しています。
dbt testの自動実行
Cosmosはdbt run
に続けてdbt test
を実行するようにDAGをつくります。モデルに対するテストを追加すると自動的にDAGへ反映されますので、追加忘れを防げて便利です。
Cosmos運用開始にあたっての課題
これまで見てきたように、Cosmosは既にAirflowを使っている場合に有力な手段となり得ます。しかしながら、運用を始めるにあたって解決したい課題が2つありました。
モデルごとに適切な実行頻度が異なる
dbtを実行するDAGは、1時間置きに実行するスケジュールに設定しています。そのDAGですべてのモデルを実行すると、データウェアハウスであるGoogle BigQueryへの負荷が心配になります。モデルの中には更新頻度は日次で十分なものがあるので、指定した時刻以外はスキップしたいというニーズがありました。
複数のモデルオーナーがいる
事業横断でデータ分析ができるように、ペパボで運営している事業すべてを単一のdbt projectで管理する決定をしました。これはつまり、モデルオーナーが複数いることになります。 事業ごとの適切な権限管理やクエリ実行の傾向把握を目的に、モデルごとに使用するGoogle Cloudのサービスアカウントを切り替えることが求められました。
解決手段
これら2つの課題を解決するために、DbtTaskGroupとタグを組み合わせて使いました。
モデル実行条件の制御
下記のコードは、モデル実行頻度やモデルオーナーごとにDbtTaskGroupを作る例です。
MODEL_OWNER = "marketing"
dbt_hourly = DbtTaskGroup(
group_id=f"dbt_hourly_{MODEL_OWNER}",
project_config=project_config,
profile_config=get_profile_config(model_owner=MODEL_OWNER),
execution_config=execution_config,
render_config=RenderConfig(
load_method=LoadMode.DBT_MANIFEST,
select=[f"tag:hourly,tag:{MODEL_OWNER}"],
),
operator_args=operator_args,
)
RenderConfig.selectで"hourly"と"marketing"のタグがついているモデルをDbtTaskGroupの対象にしています。
models:
- name: marketing__sample_model
config:
materialized: view
tags:
- hourly
- marketing
また、モデルオーナーごとにサービスアカウントを切り替えるため、ProfileConfigを作る関数を以下のように実装しています。
def get_profile_config(model_owner: str):
return ProfileConfig(
profile_name=model_owner,
target_name=os.environ["ENV"],
profile_mapping=GoogleCloudServiceAccountFileProfileMapping(
conn_id=get_google_cloud_conn_id(owner=model_owner),
profile_args={
"project": DEFAULT_PROJECT,
"dataset": DEFAULT_DATASET,
"keyfile": get_service_account_keyfile(owner=model_owner),
"priority": "batch",
},
),
)
実行タイミングを毎日9時にしたいモデルができた場合は、たとえばtag: daily_0900
のように指定したDbtTaskGroupをつくります。そしてBranchPythonOperatorを使って、logical dateに応じてスキップするか否かを判定します1。
複数のタグをAND条件で指定するにはParsing methodを変更する
複数のタグをAND条件で指定できるか否かは、RenderConfig.load_method
の設定に依存します。Cosmosのドキュメントにならい、以下の文章ではこの設定をparsing methodと呼称します。
Parsing methodのデフォルトはautomatic
ですが、今回の環境では最終的にcustom
が選ばれるためcosmos独自のdbt parserがモデルをパースします。しかしcustom
の場合は、複数のタグをAND条件で指定することはできないという既知の問題があります。
そこで、Parsing methodをdbt_manifest
に変更しました2。このパース方法はdbt compile
で作られるmanifest.json
ファイルをパース対象にします。
CI/CDでのmanifest.jsonの生成
manifest.json
ファイルは、デフォルト設定ではtarget/manifest.json
に作成されます。Airflow DAGを組み立てるうえで必要になりますので、
CI/CDを行っているGitHub Actionsジョブの中でmanifest.json
ファイルを作成して、DAG bagへ追加します。既存のデプロイ処理に下記のようなステップを追加しました。
- name: Update dbt manifest.json
working-directory: dags/dbt
shell: bash
run: |
diff_seeds=$(git diff --name-only --diff-filter=ADR origin/main...HEAD | grep 'dags/dbt/seeds/' || true)
diff_others=$(git diff --name-only --diff-filter=ADMR origin/main...HEAD | grep 'dags/dbt/' | grep -v 'dags/dbt/seeds/' || true)
if [ -n "$diff_seeds" ] || [ -n "$diff_others" ]; then
mkdir /github/home/.dbt
cp profiles_ci.yml /github/home/.dbt/profiles.yml
poetry run dbt deps
poetry run dbt compile
fi
まとめ
Cosmosでdbtモデルの実行条件を柔軟に制御する方法として、DbtTaskGroupとタグを組み合わる例をご紹介しました。Cosmosは一度整備するとdbtモデルの実装に集中できる環境をつくれますので、Airflowを運用しているdbtユーザーにおすすめのパッケージです。
-
実行タイミングごとにDAGを分ける選択肢もあると思いますが、いまのところのパターンが少ないので1つのDAGで実装しています。 ↩
-
Parsing methodが
dbt_manifest
である場合、ノード名がaliasになってしまう問題がありました。これはcosmos v1.2.5で修正されていますので、必要に応じてバージョンアップするとよいと思います。 ↩