データ基盤

Google Cloud Composerをcomposer-1.17.6-airflow-2.1.4にバージョンアップしました

データ基盤

技術部データ基盤チームの@tosh2230です。

昨年2021年12月に、ペパボのデータ基盤であるBigfootのメインコンポーネントであるGoogle Cloud Composer(以下、Cloud Composer)をバージョンアップし、 Apache Airflow(以下、Airflow)のバージョンは1.10.14から2.1.4になりました!

バージョンアップの目的や、作業の過程で苦労したポイントなどをブログにまとめます。

  1. Cloud Composerとは
  2. バージョンアップの目的
  3. バージョンアップ検証
  4. Step 1:composer-1.17.3-airflow-1.10.15
  5. Step 2:composer-1.17.6-airflow-2.1.4
    1. CLIコマンドが一部変わっている
    2. プリインストールされているPyPIパッケージが変わっている
    3. 各種Operatorのパッケージ構造が変わっている
    4. BigQueryに対するDelete操作でプロジェクトの指定が必須になっている
    5. AirflowデータベースがMySQLからPostgreSQLに変わっている
    6. メンテナンスウィンドウがデフォルト設定される
  6. 切り替え作業
  7. 未解決課題
    1. catchup=Falseにするとexecution_dateがずれるパターンがある
    2. DuplicatedOperatorが残っている
  8. バージョンアップが終わるとどうなる?

Cloud Composerとは

Google Cloudが提供するCloud Composerは、Airflowをベースとしたフルマネージドワークフローオーケストレーションサービスです。

下図はBigfootの概要を示しており、右下にあるのがCloud Composerです。Bigfoot内外のデータ操作をコントロールする役目を負っています。

bigfoot.png

バージョンアップの目的

作業前後のバージョンは以下のとおりです。

Stage Image Cloud Composer Airflow Python
バージョンアップ前 composer-1.15.1-airflow-1.10.14 1.15.1 1.10.14 3.6.10
バージョンアップ後 composer-1.17.6-airflow-2.1.4 1.17.6 2.1.4 3.8.12

Airflow v2のリリースブログに"Massive Scheduler performance improvements"とあるように、Airflow v2ではスケジューラの性能が大幅に強化されています。

キューに入ったタスクが開始するまで、Airflow v1.10.14では最長で5分ほどかかっていたのですが、v2.1.4に上げたところ数秒に短縮され、全体的なDAG実行時間を短縮することができました。

また、Cloud Composerを構成するコンポーネントのバージョンが全体的に新しくなったことで、それぞれに関連する既知の脆弱性に対応できたことも、バージョンアップによる恩恵のひとつです。

Cloud Composerは、AirflowやGoogle Kubernetes Engine、Google Cloud SQL、そして多くのPyPIパッケージの組み合わせで構成されています。 IAMやIdentity-Aware Proxyによってリソースが保護されているものの、多層防御を意識して各コンポーネントのバージョンをできるだけ最新に近づけておくのが理想的と考えています。

バージョンアップ検証

Cloud Composerバージョンアップの際は、検証用の環境を別途用意して、事前検証を行うようにしています。 GCPのドキュメントで、Cloud ComposerをAirflow v2に移行する手引き(Migrate environments to Airflow 2)が公開されていましたので、まずはこちらに目を通しました。

これによると、Upgrade checkと呼ばれるチェック処理があり、それを実行するにはAirflowをv1.10.15にする必要がある、と書かれています。

加えて、Airflowのメジャーバージョンをはじめ、複数のコンポーネントのバージョンが上がることを踏まえて、下記のように二段階に分けて検証することにしました。

Step 1: Cloud Composerのマイナーバージョンを上げる

Step 2: Airflowのメジャーバージョン、Pythonのマイナーバージョンを上げる

Step Image Cloud Composer Airflow Python
Step 0 composer-1.15.1-airflow-1.10.14 1.15.1 1.10.14 3.6.10
Step 1 composer-1.17.3-airflow-1.10.15 1.17.3 1.10.15 3.6.10
Step 2 composer-1.17.6-airflow-2.1.4 1.17.6 2.1.4 3.8.12

Step 1:composer-1.17.3-airflow-1.10.15

composer-1.17.3-airflow-1.10.15では、何事もなくDAGは正常終了しました。 そしてUpgrade checkを実行してみました。

Found 0 problems.

Not found any problems. World is beautiful.
You can safely update Airflow to the new version.

_人人人人人人人人人人_
> World is beautiful <
 ̄Y^Y^Y^Y^Y^Y^Y^Y^Y ̄

Step 2:composer-1.17.6-airflow-2.1.4

お墨付きを得たためStep2へ進みましたが、Cloud Composer環境はなかなか作れず、DAGが動くようになるまでしばらく時間を要しました。

ここからは、バージョン間の差異を埋めるべく変更したポイントを抜粋してご紹介します。

CLIコマンドが一部変わっている

Airflow2になると、CLIのコマンドが一部変更になります。

Cloud Composer環境の構築やDAG起動を行うスクリプトでCLIコマンドを使っていましたので、コマンドが違うというエラーが発生しました。 今回は、Airflowバージョンによって実行するコマンドを変えるようにスクリプトを修正して対応しました。

CLIの変更点は下記のページにまとまっています。

Airflow CLI changes in 2.0

プリインストールされているPyPIパッケージが変わっている

Airflow v2になると、Cloud ComposerにプリインストールされるPyPIパッケージが変わります。

Cloud Composer version listのページに、Cloud Composerイメージごとにどのパッケージが入っているのかを確認できますので、このパッケージリストと、現在利用しているパッケージを比較するのがよいと思います。

またProvider packagesの追加では、extrasを正確に指定しないとインストールできません(ごくまれに成功するときもあるのが困りものです)。

これは、上記のバージョンリストのページに記載されているプリインストールパッケージの中から、各Provider PackageのページにあるCross provider package dependenciesに含まれる依存パッケージをextrasに列挙するとインストールできます。

今回は、AmazonとSlackのパッケージを追加したかったので、以下のように指定しました。

apache-airflow-providers-amazon[cncf.kubernetes, ftp, google, imap, mysql, ssh]==2.5.0
apache-airflow-providers-slack[http]==4.1.0

各種Operatorのパッケージ構造が変わっている

Cloud Composerイメージの変更に起因して、一部のPythonパッケージの構造が変わっているものがあります。 Airflow v1と共通のソースコードで管理したかったので、一時的にairflow.versionの値を条件に、インポートパスを変更する方針にしました。

条件分岐させたパッケージの例を以下に挙げます。

from airflow.version import version as airflow_version

if airflow_version.startswith('2.'):
  from airflow.kubernetes.secret import Secret
  from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
  from airflow.sensors.external_task import ExternalTaskSensor
  from pendulum import DateTime as Pendulum
else:
  from airflow.contrib.kubernetes.secret import Secret
  from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
  from airflow.operators.sensors import ExternalTaskSensor
  from pendulum import Pendulum

BigQueryに対するDelete操作でプロジェクトの指定が必須になっている

BigqueryInsertJobOperatorやBigQueryTableDeleteOperatorなど、BigQueryを操作するOperatorはいくつかありますが、それらを使ってレコードの削除(SQLのDELETE文)を実行する際に、GCPプロジェクトを明示しなければエラーになる、という仕様に変わっていました。

これまではデフォルトプロジェクトを設定してコードでの記述を省略していたため、DELETEを実行している処理はすべて、プロジェクトを明示的に指定するように変更しました。

AirflowデータベースがMySQLからPostgreSQLに変わっている

大胆なことに、AirflowデータベースがMySQLからPostgreSQLに変わっていました。

DAGの中にAirflowデータベースを直接参照するものがあり、それを検証環境で実行したところMySqlHookでつながりませんでした。 それはそうです、PostgreSQLになっているのですから…

Airflow Connections "airflow_db"のconn_typeが、postgresになっているのを見て気づきました。 PostgresHookに変更すると正常終了しました。参照しているテーブルのスキーマに変更がなかったのは救いでした。

メンテナンスウィンドウがデフォルト設定される

Cloud Composerのプレビュー機能のひとつにメンテナンスウィンドウの指定があります。

composer-1.17.6-airflow-2.1.4では、メンテナンスウインドウがデフォルトで有効になっており、一度環境を作成した後はオフにできませんでした。

メンテナンスウインドウを指定しなければ、DAGの実行状況にかかわらずメンテナンスが開始されてしまうため、時間帯を指定するのがよいと思います。 メンテナンスウインドウは、週あたり12時間以上確保するように設定する仕様です。

切り替え作業

バージョンアップ作業では、既存の環境のバージョンをそのまま上げるのではなく、新環境を作成して特定の日時を境に切り替える方法を取りました。

具体的には、現環境のDAGにend_dateを設定し、その次回にあたる実行日時をstart_dateとして新環境のDAGに設定しました。

この方法をとることにより、検証用の環境と同じ手順で新環境を構築できますので、バージョンアップ作業による不測の事態を避けることができます。

未解決課題

新環境への切り替え作業は概ねうまくいき、新バージョンでも順調に稼働しています。

しかしながら、下記のような未解決課題が残っていますので、折をみて調査・改善していこうと思っています。

catchup=Falseにするとexecution_dateがずれるパターンがある

これは、バージョンアップ前のcomposer-1.15.1-airflow-1.10.14でも起きていた事象です。

AirflowにはCatchupという、過去に実行していないDAG Run(DAGの定義から実行周期ごとにインスタンス化したもの)をさかのぼって実行する機能があります。 例えば今日、2020年2月1日をstart_dateとするDaily DAGをcatchup=Trueでデプロイすると、過去2年分の処理がスケジューリングされます。

こうした仕様は事故の元になりやすいため、catchup=Falseに設定しましたが、毎時でスケジュールしたDAGのexecution_dateが、指定時間からミリ秒のレベルで少しずつずれてしまう、という事象に遭遇しました。

execution_dateがずれることにより、その下流のDAGにセットしたExternalTaskSensorが反応しなくなるため、DAGが順番に起動することを前提としたワークフローを組んでいる私たちにとって、影響が大きい問題でした。 この事象は、catchup=TrueにしてDAGを再デプロイすると解消します。Intervalが日次以上のDAGでは、この事象は確認されていません。

そして、catchup=Trueに設定を戻したことで、start_dateの指定ミスが起きると過去分のDAG Runが実行されてしまう可能性が残りますが、その対策として、GitHub ActionsでのCIにstart_dateのチェック処理を組み込んでいます。 スケジュール変更が行われたDAGのstart_dateが、未来の日時になっているかを確認することで、デプロイ直後に過去分のDAG Runが実行されてしまうことを予防しています。

DuplicatedOperatorが残っている

BigqueryOperatorやBigQueryExecuteQueryOperatorといったDuplicatedOperatorを使用しているDAGが残っています。バージョンアップの際にまとめて変更するのは大変ですので、少しずつ最新のOperatorに置き換えていこうと思います。

バージョンアップが終わるとどうなる?

2021年12月16日に、composer-2.0.0-airflow-2.1.4がリリースされました。次はCloud Composerのメジャーバージョンアップです。

Cloud Composer 2の特徴は、Airflow WorkerがGKE Autopilotで動作するためにスケーリングがより柔軟になることです。Cloud Composer 1である現在は、時間帯に応じてワーカーノード数を増減させていますが、需要に応じてスケールイン・アウトしてくれると、運用の手間が減ってとても助かります。

今回のバージョンアップでその恩恵の大きさを体感できましたので、今後はもっと気軽にバージョンアップできるように日々の運用を工夫していきたいと思います。