技術部の染矢です。
最近は社内のデータ基盤「Bigfoot」の信頼性向上や機能開発をおこなっています。
Bigfootは、Google CloudのBigQueryとCloud Composerで主に構築された社内システムです。
この記事では、Cloud Composer環境におけるデータパイプラインの信頼性を向上させた事例を紹介します。
前半では、Cloud Composer環境において信頼性を低下させる原因であったソフトウェアのバグに気付くまでの調査過程を説明します。Cloud Composer環境で調査可能なメトリクスやログ、そして具体的なコマンドを紹介しながら説明していきます。
後半では、そのバグに関連する修正のためにOSSにコントリビュートしようとしたら、さらにバグを作ってしまった失敗談を綴ります。OSSのコードリーディングをしながら、そのバグの原因であったPythonのshallow copyの挙動についてサンプルコードとともに解説していきます。
Cloud Composer環境における信頼性とは
Cloud Composerは、Apache Airflow(以降"Airflow"と表記します)ベースのマネージドサービスです。
Airflowは、DAGを実行するためのプロダクトです。DAGとは、Directed Acyclic Graphの略で、日本語では有向非巡回グラフと呼ばれます。始まりから終わりに向かい、ループしないようなグラフ構造のことです。データパイプラインにおいては、例えば「データを取り込んだ後に、複数のデータ加工タスクを実行する」などといったタスク同士の依存関係を表現するために利用されます。
Bigfootでは、複数のDAGが毎日定期的に実行されています。
しかし、このDAGが、失敗するのです。
月に1回の失敗であればそこまで大きな問題にはなりません。しかし毎日何回も失敗されてしまうと、リカバリのために人手を使わなければいけなくなるし、何よりそのシステムを信頼できなくなります。
ユーザー(社内のBigfoot利用者)は、定義したDAGが毎回成功するものとして普段の業務をしています。しかし何度も失敗してしまうのであれば話が変わります。信頼できないシステムは使いたくなくなります。せっかく便利な仕組みを作ったのに、その仕組みが使われないのであればそれは即ちビジネス的な損失です。
そのため、DAGの成功率は高く保ち続ける必要があります。
ペパボのデータ基盤チームでは、各DAGの成功率をSLI(Service Level Indicator、サービスレベル指標)の一つとして指標化しています。また、そのSLIが維持するべき目標値をSLO(Service Level Objective、サービスレベル目標)として設定しています。SLOを破るような指標があれば機能開発の手を止めて信頼性回復のために時間を使うという運用にしています。
なぜか失敗するDAG
いつ頃からか、「なぜか失敗するDAG」が増えていました。DAGタスクの実行ログを見ても手がかりが得られないような失敗です。
前述の通り、DAGの失敗率は低く保ちたいため本腰を入れて調査をすることにしました。
調査1. Cloud Composerが提供するメトリクス
Cloud Composerは、その環境のメトリクスをいくつか提供しています。その中に、Pod evictions
というメトリクスがあります。確認してみると、DAGの失敗が発生した時刻と、Pod evictions
のメトリクスに変化があった時刻とが大まかに一致していました。
Pod evictionはKubernetesの概念です。PodがNode上で終了されることがPod evictionです。
Eviction is the process of terminating one or more Pods on Nodes.
参考: Scheduling, Preemption and Eviction | Kubernetes
Airflowの話をしていたと思ったら急にKubernetesの話をされて驚いている読者もいるかもしれませんが、Cloud ComposerはGKE(Google Kubernetes Engine)とその他いくつかのサービスを組み合わせてAirflow環境を提供するようなマネージドサービスです。そのためCloud ComposerにおいてはAirflowとKubernetesは密接に関連します。このあとしばらくはKubernetesの話です。
Airflowの構成要素の一つに、Workerというプロセスがあります。Workerプロセスが実際にDAGタスクを実行します。Cloud Composer上のAirflow環境ではWorkerプロセスはKubernetesの一つのPodとして動きます。
さて、このWorker PodにPod evictionが発生すると、タスクの実行に悪影響を及ぼします。単純にそのプロセスが中断されるため、そのプロセスによって実行されているタスクが「失敗」として扱われることに繋がります。
そのため、Pod evictionの発生とDAGの失敗には関連があると判断しました。
調査2. Cloud Composerが提供するログ
次に、なぜPod evictionが発生したのかを調べるためにログを調査しました。 Google Cloudではそれぞれのサービスから発生したログをCloud Loggingというサービスから調査できます。
Pod evictionが発生した時刻のログを調べていると、次のログを発見しました。
A worker pod was evicted at 2024-08-27T23:56:15Z with message: Pod ephemeral local storage usage exceeds the total limit of containers 5119Mi.
Pod ephemeral local storage usage exceeds the total limit
だそうです。他の時刻の失敗近辺のログも確認したところ、同じようなログを発見しました。ローカルストレージの使用量が上限を超えてしまったためにPodが終了されたと考えていいでしょう。
5119Miはほぼ5GiBです。Podのローカルストレージの使用量が5GiBを頻繁に超えてしまうことはあまり考えられません。ストレージについて何か異常なことが起きているようだ、と推測が立ちました。
調査3. GKEが提供するメトリクス
GKEはそのKubernetesクラスタのメトリクスを提供しています。
Cloud MonitoringのMetrics explorer機能によってメトリクスを確認できます。特定のPodのメトリクスに絞り込むために、Airflowのタスクのログから、失敗したタスクが実行されたPodを取得しました。
"Running <TaskInstance: some_service_task_1.wait_upstream_dags.get_execution_delta_hoge_fuga scheduled__2024-08-27T00:00:00+00:00 [running]> on host airflow-worker-fl28g"
Pod名はairflow-worker-fl28g
であることが分かります。
Cloud Monitoringでは、次のように、PromQLの形式でメトリクスをクエリできます。
kubernetes_io:container_ephemeral_storage_used_bytes{pod_name=~".*fl28g"}
このクエリから次のメトリクスが可視化されました。
Podの起動から30分足らずでローカルストレージの使用量が5GBを超えたことが分かります。すごい!!
調査4. コンテナのファイルシステム
Cloud Composerはマネージドサービスといえど、そのパーツはGKEです。Kubernetes APIにアクセスするための認証情報さえ得られれば、kubectl
コマンドによってクラスタ内部の状態をインタラクティブに調査できます。
GKEにおいては、必要な権限を得てからgcloud
コマンドによってKubernetesクラスタへの認証情報を得られます。
参考: kubectl をインストールしてクラスタ アクセスを構成する
ここから先は勘で進めていきました。メトリクスの動きやDAG失敗の頻度に若干の規則性がある気がしたため、それに合わせたタイミングでコンテナの内部状態を調査しました。
いろいろと調べた末に、次の通り原因を見つけました。
$ kubectl exec -it airflow-worker-hogefuga -- du -sh /tmp/*
7.6M /tmp/cosmos
406M /tmp/cosmos-venv0k4rdj03
406M /tmp/cosmos-venv3h0tgrmm
406M /tmp/cosmos-venv4i4noepm
406M /tmp/cosmos-venv5f9d2t4e
406M /tmp/cosmos-venv6ic6dauo
408M /tmp/cosmos-venv7jae8s7h
406M /tmp/cosmos-venv8_3_0sjh
406M /tmp/cosmos-venv93rohh8e
406M /tmp/cosmos-venvao9p5wmo
408M /tmp/cosmos-venveyb6hzm3
408M /tmp/cosmos-venvj8xutud8
408M /tmp/cosmos-venvjeikwmz1
406M /tmp/cosmos-venvjqmgubl9
406M /tmp/cosmos-venvk63f97dv
406M /tmp/cosmos-venvlr1q383r
406M /tmp/cosmos-venvlrb5ogu8
408M /tmp/cosmos-venvmu14p2ro
408M /tmp/cosmos-venvpafsre8l
406M /tmp/cosmos-venvqd5tdcze
406M /tmp/cosmos-venvs9894gir
408M /tmp/cosmos-venvu7tfllt4
408M /tmp/cosmos-venvwi1jztce
406M /tmp/cosmos-venvys0qyf4h
406M /tmp/cosmos-venvyu3tn_th
4.0K /tmp/hsperfdata_root
4.0K /tmp/pymp-k2c1n3t8
4.0K /tmp/tmp30d8de7m
272K /tmp/tmp3njoepht
4.0K /tmp/tmp3wbe7yno
4.0K /tmp/tmp4hyhr84m
4.0K /tmp/tmp8xk1vctx
4.0K /tmp/tmp90cli_w6
272K /tmp/tmpb1bgh6t2
4.0K /tmp/tmpcnrm_0og
272K /tmp/tmpfc17xxo_
4.0K /tmp/tmpfyfcef4z
4.0K /tmp/tmph760qvpa
272K /tmp/tmpi8_22z6r
4.0K /tmp/tmpi_dbzvi3
4.0K /tmp/tmpigpnxcei
272K /tmp/tmpiyx3q_cy
4.0K /tmp/tmpj2nmk7p5
272K /tmp/tmpla5x51qw
4.0K /tmp/tmpoy4unv2k
4.0K /tmp/tmpr5814b9d
4.0K /tmp/tmprwgwn3jv
4.0K /tmp/tmpsq9mqu1d
4.0K /tmp/tmpswy35_fw
272K /tmp/tmpt9i_ad1r
4.0K /tmp/tmpvivu9i4j
272K /tmp/tmpxtmv7b12
4.0K /tmp/tmpxwbgtytr
4.0K /tmp/tmpzubyeui2
kubeectl exec
コマンドによってPodのコンテナに対して実行コマンドを与えられます。du
コマンドは、ディスク使用量を確認するためのコマンドです。-h
オプションは、人間が読みやすいKBやMBの単位で出力するためのオプションです。-s
オプションは、それぞれのディレクトリのサブディレクトリは表示せず、そのディレクトリの使用量合計のみを出力するためのオプションです。
見ての通り、/tmp/cosmos-venv*
というディレクトリが、それぞれ400MB程度使っています。さらにそのディレクトリが10以上もあります。これなら5GBを超えるのにも納得できます。いや、納得できません。なぜ/tmp
ディレクトリでこんなに容量を使っているのでしょうか。/tmp
ならすぐに消えてもよさそうに思えますが、メトリクスの動きを見るに、消えずに残り続けているようです。
Cosmosとは
BigfootにおいてはAirflowのタスクとしてdbtを実行するために、astronomer/astronomer-cosmos(以降、"Cosmos"と呼びます)というプロダクトを使用しています。
dbtとは、データ変換をソフトウェアエンジニアリングの手法で管理・実行するためのツールです。
膨れ上がっているディレクトリ名から、このCosmosが原因ではないかとアタリをつけました。
解消
CosmosのGitHubリポジトリのissueを探して読むと、この/tmp/cosmos-venv*
ディレクトリが消えない事象は既知の問題であり、バージョン1.6.0以降で修正されているらしいということが分かりました。
早速、Bigfootで使用しているバージョンを1.5.0から1.6.0に変更したところ、目に見える効果が現れました。
ephemeral storageの使用量が頭打ちになるようなグラフから、一時的に上がって下がる山のようなグラフになったことが分かります。 解決したかった事象である、Pod evictionによるDAG Runの失敗も減りました。
一件落着ならよかったのに
さて、私はCosmosの一時ディレクトリが消えない問題を調査するために、自分のマシン上でAirflow環境を構築してCosmosの挙動を検証していました。 その途中で、不思議な挙動に気がつきました。バージョン1.5.0から1.6.0への変更によって、一時ディレクトリが消えない問題が解消された代わりに、Pythonの仮想環境構築が2回実行されるようになったのです。
おそらくこれは意図していない挙動であると私は考えました。一回のdbtコマンドの実行において、仮想環境の構築は一回で十分なはずです。バージョン1.5.0では一つのタスク実行において仮想環境の構築は一回のみでした。
そこで、CosmosのGitHubリポジトリにこの問題を解決するためのPull Requestを作成しました。
https://github.com/astronomer/astronomer-cosmos/pull/1200
このPRはマージされました。しかし、この変更によって重大な問題が発生していることに気がつきました。
仮想環境が使われない
私の変更が含まれた新しいバージョン1.7.0では、仮想環境のdbtではなく、そのホスト環境のdbtが実行されていました。 コードを確認してみると、どうやら実行するPythonのパスが、仮想環境のパスではなく、デフォルトのホスト環境のパスになっているようです。
私は自分の変更箇所について十分なカバレッジになるようテストを書きつつ、手元の環境でも検証をしていたつもりでした。それでも気がつけない問題が入り込んでいました。
揮発するプロパティ
手元のAirflow環境でCosmosを使ったdbtタスクを実行させつつ、デバッグのためのログを出してみると、興味深い事象が見つかりました。
タスク実行の途中で設定しているはずのOperatorインスタンスのプロパティの値が、リセットされているように見えました。Operatorとは、AirflowにおいてDAGタスクを実際に実行するクラスです。
実行するPythonのパスはself._py_bin
というプロパティに設定されています。
また、CosmosのOperatorにおいてはrun_subprocess()
メソッドによって実際のタスクが実行されます。
バージョン1.7.0では、実行処理がこのrun_subprocess()
メソッドに突入した瞬間に、self._py_bin
の値がNone
になっていました。run_subprocess()
メソッドが実行される前では、確かにself._py_bin
の値が仮想環境のパス/tmp/cosmos-venvhoge/bin/dbt
だったにも拘らずです。
入れ替わるオブジェクトインスタンス
もう少し調べてみると、どうやらrub_subprocess()
メソッドに入った時点で、self
自体が異なるインスタンスになっているようだと発見しました。
メソッドの至る所に次のデバッグコードを仕込んで検証しました。
self.log.info("method called on instance %s", id(self))
参考: 組み込み関数 — Python 3.13.1 ドキュメント
id()
関数は、オブジェクトの "識別値" を返すPythonの組み込み関数です。
オブジェクトの "識別値" を返します。この値は整数で、このオブジェクトの有効期間中は一意かつ定数であることが保証されています。有効期間が重ならない 2 つのオブジェクトは同じ id() 値を持つかもしれません。
このデバッグログの結果は、次の通りでした。
...
[2024-10-08, 05:24:15 UTC] {virtualenv.py:145} INFO - execute called on instance 275939531936
[2024-10-08, 05:24:15 UTC] {virtualenv.py:158} INFO - _prepare_virtualenv called on instance 275939531936
[2024-10-08, 05:26:20 UTC] {virtualenv.py:90} INFO - run_subprocess called on instance 275940314992 # ここでselfが異なるインスタンスになっている
...
run_subprocess()
メソッドに入った時点で、self
が異なるインスタンスになっています。なんてこった。
何が起きているのでしょうか。実は、この現象は、Cosmosのバージョン1.5.0において一時ディレクトリが消えない現象と同じ部分を根にしていました。バージョン1.5.0から1.7.0においてずっと同じ潜在的なバグがあったということです。1.5から1.6で消えたように見えたバグが、私の変更によって再度顕在化したのです。
AirflowにおけるOperatorインスタンスのshallow copy
問題は、Cosmosのコードの次の部分にありました。
https://github.com/astronomer/astronomer-cosmos/blob/534a14ebc7f56158f92bf8fe183a0b4f5e49f92a/cosmos/operators/local.py#L192-L194
def _set_invocation_methods(self) -> None:
"""Sets the associated run and exception handling methods based on the invocation mode."""
if self.invocation_mode == InvocationMode.SUBPROCESS:
self.invoke_dbt = self.run_subprocess
self.handle_exception = self.handle_exception_subprocess
elif self.invocation_mode == InvocationMode.DBT_RUNNER:
self.invoke_dbt = self.run_dbt_runner
self.handle_exception = self.handle_exception_dbt_runner
この_set_invocation_methods
メソッドは__init__
メソッドで実行されます。条件に応じてself.invoke_dbt
に自身のメソッドを代入するという処理です。self.invoke_dbt
に代入されるメソッドは、run_subprocess
かrun_dbt_runner
です。
問題は、self.run_subprocess()
実行中におけるself
が、それを実行しているインスタンスと一致するとは限らないということです。
例えば、このインスタンスがshallow copyされたらどうなるでしょうか。次のようなコードを考えます。
# test_shallow_copy.py
import copy
class MyClass:
def __init__(self):
self.value = "original value"
self.bound_method = self.get_value # メソッドを属性として保存
def set_value(self):
self.value = "another value"
def get_value(self):
print(f"self id in get_value: {id(self)}")
return self.value
# オリジナルインスタンス
original_instance = MyClass()
print(f"Original instance id: {id(original_instance)}")
# shallow copyを作成
copied_instance = copy.copy(original_instance)
print(f"Copied instance id: {id(copied_instance)}")
# コピーされたインスタンスのメソッドを実行
copied_instance.set_value()
result = copied_instance.bound_method()
print(f"Result: {result}")
result = copied_instance.get_value()
print(f"Result: {result}")
実行結果は次のようになります。
$ python test_shallow_copy.py
Original instance id: 4369879312
Copied instance id: 4369887824
self id in get_value: 4369879312
Result: original value
self id in get_value: 4369887824
Result: another value
shallow copyであるため、copied_instance
には、original_instance
の中のオブジェクトへの参照が挿入されます。そのため、copied_instance
のbound_method()
はcopied_instance
ではなくoriginal_instance
のget_value()
メソッドを参照しています。なので、copied_instance.bound_method()
を実行した結果にはcopied_instance.set_value()
の結果は反映されません。
参考: copy — 浅いコピーおよび深いコピー操作 — Python 3.13.1 ドキュメント
浅いコピー (shallow copy) は新たな複合オブジェクトを作成し、その後 (可能な限り) 元のオブジェクト中に見つかったオブジェクトに対する 参照 を挿入します。
さて、ここでAirflowにおけるOperatorインスタンスに対する操作をコードから追っていきましょう。
https://github.com/apache/airflow/blob/5f4a30117d82e5981a36cec99a0bbda2bddb54d9/airflow/models/taskinstance.py#L2951-L2957
TaskInstance
クラスのrun()
メソッドでは_run_raw_task()
関数を呼び出します。
def run(
self,
verbose: bool = True,
ignore_all_deps: bool = False,
ignore_depends_on_past: bool = False,
wait_for_past_depends_before_skipping: bool = False,
ignore_task_deps: bool = False,
ignore_ti_state: bool = False,
mark_success: bool = False,
test_mode: bool = False,
pool: str | None = None,
session: Session = NEW_SESSION,
raise_on_defer: bool = False,
) -> None:
...
self._run_raw_task(
mark_success=mark_success,
test_mode=test_mode,
pool=pool,
session=session,
raise_on_defer=raise_on_defer,
)
https://github.com/apache/airflow/blob/5f4a30117d82e5981a36cec99a0bbda2bddb54d9/airflow/models/taskinstance.py#L261-L262
_run_raw_task()
関数では、TaskInsntance
インスタンスのtask
(Operatorインスタンス)に対してprepare_for_execution()
メソッドを呼び出します。
def _run_raw_task(
ti: TaskInstance,
mark_success: bool = False,
test_mode: bool = False,
pool: str | None = None,
raise_on_defer: bool = False,
session: Session | None = None,
) -> TaskReturnCode | None:
...
with set_current_task_instance_session(session=session):
ti.task = ti.task.prepare_for_execution()
https://github.com/apache/airflow/blob/8ca061ddf5fb85c79b1212ca29112190ebb0aab5/airflow/models/baseoperator.py#L704-L708
BaseOperator
クラスのprepare_for_execution()
メソッドでは、自身をshallow copyしたインスタンスを返します。
def prepare_for_execution(self) -> BaseOperator:
"""Lock task for execution to disable custom action in ``__setattr__`` and return a copy."""
other = copy.copy(self)
other._lock_for_execution = True
return other
このAirflowの機構によって、CosmosのOperatorインスタンスが実行される際にはrun_subprocess()
メソッド内におけるself
がshallow copyされる前のインスタンスになるという仕組みです。それによって、実行時に設定しているはずのself._py_bin
プロパティの値がNone
になってしまっていました。その結果、self._py_bin
で指定されるはずのpython仮想環境が使われなかったのです。そして同様に、Cosmosのバージョン1.5.0における一時ディレクトリが消えない問題は、一時ディレクトリのパスを保持するプロパティにrun_subprocess()
内でアクセスしていたことが原因でした。実行時に一時ディレクトリのパスがプロパティに記録されるのに、実行前のインスタンスのプロパティが参照されていたため、一時ディレクトリが消えなかったということです。
三件落着
問題に気がついた私は、CosmosのGitHubリポジトリに、この問題を報告するためのIssueを作成しました。
ちなみに、このときの私はshallow copyによる問題であると理解しておらず、Airflowのdagpickle
によるものでは?とコメントしています。
そして、問題を修正するためのPull Requestを作成しました。
https://github.com/astronomer/astronomer-cosmos/pull/1252
私のPRは無事にマージされました。これによって、一時ディレクトリが消えない問題と仮想環境が使われない問題の両方の原因が解消されました。さらに、一つのdbtタスクに対する仮想環境の構築回数が2回から1回になったことで、dbtタスクの実行時間も削減されました。ペパボ社内の環境における変更前後の一週間分のdbtタスクの実行時間を比べてみると、2割以上減っていました。
まとめ
以上、Cloud Composer環境の信頼性向上のためのトラブルシュート事例を紹介しました。shallow copy, deep copyの違いによるトラブルはたまに見かけますが、まさかここでも出会うことになるとは、Airflowのコードから該当箇所を見つけるまで予期していませんでした。
また、バグを見つけて修正できた!と思ったら致命的なバグを出してしまったので私は落ち込んでいました。しかし、修正のPRをCosmosのメンテナの方にレビューしていただいたときに、印象に残る言葉をかけてもらいました。曰く、「コードを書くときには常にバグを出すリスクがあります。そして、今回のようなケースではレビュワーはPRを出した本人と同等の責任を負います。なので自分を責めないでください(筆者による和訳)」とのことでした。
OSSにバグを仕込んでしまったことを誇るわけではありませんし、自分の失敗を正当化する意図もありません。しかし、このように許容してもらえて私は安堵しました。
そして、ソフトウェアエコシステムの一部として活動するエンジニアの一人として、大切なことを学べたと思っています。それは、よく「HRT」という言葉で表される、謙虚・尊敬・信頼の重要性です。Cosmosのメンテナの方のコミュニケーションのとりかたは、まさにHRTな態度であったと思います。
現代ではまだ、多くのソフトウェアは人間が書くものです。そのため、人間同士でそれぞれのパフォーマンスを最大化できるように、謙虚・尊敬・信頼にあふれたコミュニケーションをとることが重要であると感じました。
大きめの失態を技術記事として公開することに引け目もありましたが、それ以上に学びの大きい体験であったためこの記事を執筆しました。