こんにちは。あなただけの彦星になりたい、鹿児島が生んだ三大Hikoの一人、和彦こと、P山 です。
今日はデータ基盤チームで利用しているワークフロー管理プラットフォームのApache Airflow(以降Airflow) で利用しているDAGのCI環境をクラウドネイティブな技術を利用して、リニューアルした実装を紹介します。
DAGというのは Directed Acyclic Graph
の略で有向非巡回グラフ
と訳されます。Airflowではそれぞれのタスクをまとめたものを1つのDAGとして定義します。
多くの方になじみのない言葉でしょうから、この記事内においては「プログラムコード」と脳内変換していただければ読みやすいです。
リニューアル前
データ基盤チームではAirflowの実行基盤としてGCPのCloud Composer を利用しています。
自動テストを行うために、GitHub ActionsのCronをトリガーに、Terraformを利用し、毎朝、2つCI環境を起動して、終業時間に自動でCI環境を削除するような運用をしていました。このような運用をしていた背景として、Cloud ComposerのCI環境の作成には約40分程度かかることと、コスト削減のために、開発していない時間は環境を削除する必要があったためです。
この運用で不便だった点は2つあります。1つ目は共用環境であるかつ、2つの環境しかないため、他の開発者がテストを実行していると待ち時間が発生することです。2つ目は、テスト結果をWEB UIで確認するときの表示が他の開発者の実行結果と混在するので確認がしづらいことです。特に、後者についてはデータ基盤チームと仕事をしはじめた当初、P山自身がAirflowについて無知であったため、とても辛い思いをしました。
これらの課題を解決するために、CI環境をリニューアルしました。
リニューアル後
変更後のアーキテクチャは下記のとおりです。
実行環境はオンプレミスなプライベートクラウド上に構築したKubernetesクラスターです。
どのような流れでテストが実行されるのか説明します。
1. 開発者がプルリクエストを作成する
開発者がGitHub Enterprise Server(以降GHES)にブランチをプッシュし、Pull Requestを作成したタイミングで、ArgoCDのApplicationSetのひとつであるPull Request Generator が、Helmを実行し、Airflowの実行環境を構築します。
Helmについては、公式のHelmチャート を利用していますが、Cloud Composerとの環境差異を埋めたり、GCSのバケットを作成、Service Accountの権限付与などを行う必要があるので、Chart.yamlを作成し、自前のHelmチャートがAirflow公式のHelmチャートに依存して、独自にJobやIngressのリソースを定義できるようにしています。よってディレクトリの構成は下記のようになっています。
% tree
.
├── Chart.lock
├── Chart.yaml
├── Makefile
├── README.md
├── setup.rb
├── templates
│ ├── _helper.tpl
│ ├── ingress
│ │ └── api.yaml
│ ├── job
│ │ └── grant-gcs-access.yaml
│ └── rbac
│ ├── clusterrole.yaml
│ └── rolebinding.yaml
└── values.yaml
manifestで必要な秘匿情報についてはSealed Secrets を利用して管理しています。manifestsの生成の元となるパスワードなどの情報は1Passwordに存在するため、それを取得して、動的にmanifestを生成するセットアップスクリプトを準備しています。1Passwordは op
というCLIを提供しているので、op
を利用して、下記のような実装を行っています。
def get_secrets
@items ||= JSON.parse(`op item list --format json`.chomp)
end
def get_op_secret(name)
unless @values[name]
item = get_secrets.find do |item|
item['title'] == name
end
id = item['id']
@values[name] = if item['category'] == 'DOCUMENT'
`op document get #{id}`.chomp
elsif item['category'] == 'DATABASE'
JSON.parse(`op item get #{id} --format json`.chomp)['fields']
else
JSON.parse(`op item get #{id} --format json`.chomp)['fields'][0]['value']
end
end
@values[name]
rescue StandardError
puts "Error: #{name} not found"
raise
end
またAirflowにはConnectionというリソースがあり、DBへのアクセス情報や、クラウドへのアクセス情報をAirflowに登録する必要があります。それらもセットアップスクリプトで動的に生成しています。
2. Airflowへの開発者のアクセス
Airflowへのアクセス方法はWEB UIとWEB APIがあります。前者は自動テストが失敗したときなどに開発者が確認するエンドポイントで、後者はGitHub Actionsで実行されるテスト実行スクリプトがDAGを実行操作するためのエンドポイントです。
WEB UIについてはOAuth2 Proxy を利用して、認証を行っており、WEB APIについてはIP認証およびBasic認証を行っています。これらを実現するに当たり、Nginx Ingress、ExternalDNS、cert-managerを利用しています。
Ingressについては今回のCI環境のように、通常は社外ユーザーがCookieが操作ができない非公開サービスにおいてはOAuth2 Proxyをクッキードメインで共用するケースがあるため、下記のようにAnnotationを定義しています。
annotations:
nginx.ingress.kubernetes.io/auth-response-headers: X-Auth-Request-User, X-Auth-Request-Email
nginx.ingress.kubernetes.io/auth-url: https://oauth2.example.com/oauth2/google/auth
nginx.ingress.kubernetes.io/configuration-snippet: |
auth_request_set $auth_cookie $upstream_http_set_cookie;
error_page 401 =307 https://oauth2.example.com/oauth2/google/sign_in?rd=$scheme://$http_host$request_uri;
このように定義すると、https://oauth2.example.com
で稼働するOAuth2 Proxyサーバーを同じクッキードメインの配下で共用できます。
Ingressへアクセスするにあたり、名前解決はExternalDNSを利用して、Route53に自動でAレコードを登録し、TLS証明書についてはcert-managerを利用して、ワイルドカード証明書を発行しています。もともとはIngressにhostname単位でsecretNameを登録して、個別に証明書を発行していたのですが、Pull Requestが大量に出された場合などにレートリミットに到達してしまうケースがあったため、ワイルドカード証明書への運用へ切り替えました。
3. pushに合わせてAirflowのDAGを更新する
開発者がコードをGHESにpush後、自動テストの実行前に、変更されたコードをAirflowに同期する必要があります。その役割はgit-sync が担っています。git-syncはリポジトリを指定した間隔でポーリングして差分があれば、git clone
してくれるソフトウエアです。一般的にはVolume Mountしたディレクトリに対して利用することが多いです。またgit-syncは GITSYNC_EXECHOOK_COMMAND
を利用して、git clone
のあとに任意のコマンドを実行できます。たとえば、プロセスを再起動したり、後続処理をキックできます。
われわれの環境においては、Cloud Composerとの環境差異を埋めるために、git clone
後にシンボリックリンクを生成したり、GCSのバケットにDAGを転送しています。その理由は、われわれはCloud ComposerにおいてGCSのバケットをFUSEマウントしてディレクトリのように扱っており、DAG内でその前提で実装されている箇所が多数あるのと、バケットに資産があることを前提とした実装があるので、オンプレミスなKubernetesにおいてもGCSにDAGを転送しています。
4. DAGのテストを実行する
AirflowのDAGが更新されたら、GitHub ActionsからAirflowのAPIを実行して、自動テストを実行します。ここで工夫した点としては、旧来のCloud ComposerのCI環境を利用して実行されていたテストスクリプトをストラテジー・パターン で実装し直したことです。その理由は、Cloud Composerのバージョンアップをする際に、Cloud Composerの環境でもテストが必須であろうと考えたためです。Cloud Composerはその実行環境で利用されているPythonのパッケージなどを公式サイトで情報開示 しています。しているのですが、apache-airflow-providers-google==2023.6.6+composer
のように一部のpipパッケージはGCPの非公開なパッチ運用となっているため、その部分の差異を完全にオンプレミスで再現することは難しく、バージョン差異によるインシデントを避けるため、Cloud Composerのバージョンアップ前には、同じ環境でテストするようにしています。そして簡単にテストスクリプトの挙動を切り替えるためにストラテジー・パターンで再実装しました。
なお、apache-airflow-providers-google==2023.6.6+composer
のようなパッケージについては、コミュニティのIssueやGCPの公式アナウンスを考古学的に発掘し、適切なバージョンを選択する必要があります。このあたりはバージョン、利用者ごとに再現性のない考古学が必要になるので、割愛します。
5. リニューアルによって得られたメリット
冒頭にあげた、テストの待ち時間や確認のしづらさの課題を解消できただけでなく、GCPからオンプレミスに変更したことで、キャッシュアウトという意味でのコスト削減や、障害発生時などの夜間においてもCIを実行することが可能になりました。これまでは終業時間に自動でCI環境が削除されていたため、夜間のインシデント発生時は自動テストをスキップしてリリースしたり、40分の時間をかけてCI環境を構築し直したりしていました。またデータ基盤チームは少人数のチームではありますが、シニア・エンジニアの比率が高いため、フレキシブルな働き方をしているメンバーが多く、夜間帯においてもCIが実行できるのが非常に便利です。この点、誤解を招かないように書くと、当社は22時以降は夜間勤務となるため、裁量労働制の業務従事者においても事前の申請が必要です。一方でそれ以外の時間においては上長との同意の元、職位別の制限に基づき裁量のある働き方が可能です。
最後に
この記事で紹介したCI環境の改善は主にP山が主となって進めた内容ですが、データ基盤チームのメンバーも移行期間、多少の不都合がありながらもどんどん作業を前に進めてくれましたし、最近ではKubernetesのmanifestも書いてくれるようになりました。
紹介したようなデータ基盤の開発、改善、MLの取り組みなどまだまだやりたいことがたくさんあるので、ぜひ僕たちと一緒にデータエンジニアリングで事業を変革したいという方、ご連絡お待ちしています!!1