データ基盤

データ基盤のワークフローエンジンをGoogle Cloud Composer 2へバージョンアップしました

データ基盤

技術部データ基盤チームに所属しているまつもとです。

ペパボのデータ基盤では、ワークフローエンジンとしてGoogle Cloud Composerを利用しています。2021年のGoogle Cloud Composerバージョンアップに続き、昨年2022年12月に大きなバージョンアップを行いました。2021年はCloud Composerで稼働するApache Airflowをversion 1からversion 2に変更する大きなバージョンアップでした。そして、今回はCloud Composer自体をversion 1からversion 2に変更するものです。

移行作業においてはバージョンアップによる影響を最小限に留めるため、Airflowのバージョンは変えず、Cloud Composerのバージョンのみを変更しました。

この記事ではバージョンアップ作業において苦労した点や、作業時に遭遇した不具合への対処について記載します。

バージョンアップの概要

前回は、composer-1.15.1-airflow-1.10.14からcomposer-1.17.6-airflow-2.1.4へのバージョンアップのため、Airflowのメジャーバージョンアップ(version 1から2)が大きなポイントでした。このバージョンアップによりスケジューラーの性能が向上し、ワークフローの実行時間が全体的に短縮しました。

一方で今回は、Cloud Composerのメジャーバージョンアップ(composer-1.17.6-airflow-2.1.4からcomposer-2.0.27-airflow-2.1.4へ)が大きなポイントです。

Cloud Composerのメジャーバージョンによる違いはこちらに記載されています。一番大きな違いは、Cloud Composer環境が稼働しているk8sクラスターのサービスであるGoogle Kubernetes Engineのオペレーションモードにあります。Cloud Composer 1ではノードを手動で管理するstandardモードで稼働していますが、Cloud Composer 2ではワークロードの負荷に応じて自動でノードがスケールするAutopilotモードで稼働します。そのため、Airflowのコンポーネントの負荷が上がった場合はノードが自動でスケールするのはもちろんのこと、KubernetesPodOperatorなどタスクでk8sのワークロードを利用するような場合でもオートスケールの恩恵を受けることができます。

ペパボのデータ基盤では、ELTの処理をKubernetesPodOperatorを使って行っている部分が多く、タスクの増減に伴ってk8sクラスタのノード数の調整を都度手動で行ってきました。GKEのAutopilotモードになることでこれらの手間から開放されることが期待されます。

バージョンアップ作業の概要

今回のバージョンアップ作業のトピックは以下の通りです。

  • Airflowのバージョンは変更しなかったためDAGの稼働確認は最低限に済ませることができた。
  • KubernetesPodOperatorでGKEのオートスケーリングの恩恵を受けるために若干の変更作業が必要であった。
  • KubernetesPodOperatorの処理が完了してもrunning状態でスタックする事象が発生し回避策を講じた。

Cloud Composer 2でKubernetesPodOperatorを使う場合、2番目の内容が特に重要なポイントになります。

ここからは、以上3点について1つずつ説明していきます。

DAGの事前の稼働確認は最低限で済んだ

今回のバージョンアップでは、Airflowのバージョンは変えずCloud Composerのバージョンだけ変更しました。そのため、既存のDAGにほとんど手を入れることなく移行することができました。

Cloud Composerをバージョンアップする際は、AirflowとCloud Composerのバージョンアップ作業を個別に行うことで、移行後に既存のDAGで不具合が起きる可能性を減らすことができます。ただし、バージョンアップを1年に1回など低頻度で行っているとEOLが迫ってきて慌てて移行作業を行うというような事態になりがちです(今回若干EOLを超過してしまったのが反省点です)。

バージョンアップを気軽に行えるように、DAGの設計を工夫することが対策の一つとして考えられます。例えば、DAG実装時にAirflowが用意しているOperatorを直接使うのではなく、それらをラップしてフレームワーク化することで修正箇所を局所化させ、しっかりとテストを実装しておく、というような方法が良いのではと考えています。この点については、今後取り組んでいきたい課題です。

KubernetesPodOperatorの設定変更が必要

DAGの実装にほとんど手を入れる必要がなかったと書きましたが、KubernetesPodOperatorを使っている部分については対応が必要になります。

KubernetesPodOperator用Node Poolの廃止

移行に伴い、KubernetesPodOperatorでNode AffinityやNode Tolerationの指定を廃止しました。

KubernetesPodOperatorはAirflowが稼働しているk8sクラスタ内のノードにPODをスケジュールします。Airflowのスケジューラーやワーカーと同じノードでKubernetesPodOperatorのワークロードを実行することでCPUやメモリなどのリソース不足が発生する可能性があります。1 そのため、ペパボではk8sクラスタにKubernetesPodOperator専用のノードプールを設けて、そこでPODをスケジュールしていました。

一方でCloud Composer 2では、環境のGKEクラスタがAutopilotモードになり、負荷に応じてノードがオートスケールします。そのため、Airflowのワーカーなどと同じノードプールでKubernetesPodOperatorのPODをスケジュールしてもリソース不足でスケジュールが失敗するようなことも起きません(正確にはノード数の上限を設定しているのでそれを超えない限りはスケジュールが失敗しません)。この変更に合わせて、KubernetesPodOperator用のノードプールを廃止したため、Node AffinityやNode Tolerationの指定も不要になりました。

k8s カスタムnamespaceの新設

さらに、KubernetesPodOperator専用のk8s namespaceを設けて、そのnamespaceでPODをスケジュールするように変更しました。

このような構成にしたのは、default namespaceだと、KubernetesPodOperatorのリソース要求に応じてノードがオートスケールしなかったためです。

default namespaceでは、Cloud Composer独自のカスタム指標に基づいてオートスケールする2ため、KubernetesPodOperatorによるPODのリソース要求ではオートスケールしなかったものと推測しています。そのため、KubernetesPodOperator専用のnamespaceを設け、PODのリソース要求に基づいてオートスケールするようにしました。(参考: KubernetesPodOperatorの構成)

KubernetesPodOperatorの不具合への対応

Cloud Composer 2固有の問題ではないのですが、移行後に起きた事象への対応についても記載しておきます。

事象

KubernetesPodOperatorによるタスクの実行時に、PODの実行が完了しているにも関わらずタスクのStatusがrunning状態のままスタックしてしまう事象が発生しました。

原因(推測)

Airflowのissueに上がっていた事象と同じであったため、KubernetesPodOperatorがPODの終了を確認する処理に問題があると判断しました。

これは、コンテナで大量のログを出力すると、コンテナが終了したにも関わらずログを収集するプロセスがハングしてしまう可能性があるというものでした。そのため、KubernetesPodOperatorがログを待ち続けてタスクのStatusがrunning状態でスタックしてしまうというものです。

現在の環境では、KubernetesPodOperatorで小さく分割されたファイルをS3からGoogle Cloud Storageへ大量に転送する処理をおこなっており、各ファイルの転送が完了するたびに標準出力へログを出力する実装をしていました。ここで出力される大量のログがログ収集プロセスのハングを引き起こす原因と考えました。

対応

KubernetesPodOperatorのbugfixとしてPull Requestが出ていますが、ワークアラウンドとしてファイル転送処理におけるログの出力を抑制する対応を行いました。これにより、KubernetesPodOperatorがrunning状態でスタックする事象は解消されました。

移行を終えて

GKEのAutopilotモードへの変更に対するKubernetesPodOperator周りの調整が必要でしたが、それ以外の変更がほとんど必要ないため、前回のAirflowのバージョンアップよりは概ねスムーズに完了しました。一方で、KubernetesPodOperatorのログ出力を抑制している状態なので、bugfixが適用されたバージョンに移行するか、出力先を変えてロギングを再開する必要があります。

また、現時点(2023年2月下旬)でCloud Composer 2の環境は安定稼働を続けており、Cloud Composer 1の時に頻発していたタスクの実行待ちで渋滞を起こす(結果DAG Runがタイムアウトを起こす)ようなことも起きていません。KubernetesPodOperatorを使うタスクの増加によるオートスケーリングの効果については今後注視していきたいと思います。

今回の移行作業をもってAirflowとCloud Composerがともに最新のメジャーバージョンとなりました。当面はマイナーバージョンアップとなり大きな変更はないものと思われますが、次の大きなバージョンアップに備えて変更に強いDAG設計を模索していきます。