データエンジニアリング

Debezium ServerによるCDCログの抽出状態をOpenTelemetryで可観測にする

データエンジニアリング

こんにちは

技術部データ基盤チームの染矢です。日本では暑さが本格的になってきましたね。最近の私はアイスの実が主食になりました。

この記事では、Debezium Serverによる変更データキャプチャ(CDC)ログの抽出の状態を観測可能にした技術について紹介します。
この方法によって、Debezium Serverコンテナが標準出力に出すログだけでなく、内部で計測されるメトリクスや自動計装されるトレースをOTel Collector経由で任意のオブザーバビリティバックエンドに集められます。

この記事で書くこと

  • JVM環境におけるOpenTelemetryについて
  • Debezium Server内部で実装されている計装について
  • OTel CollectorからAmazon CloudWatchにテレメトリを送信する方法について

この記事で書かないこと

  • Debezium Server、OpenTelemetry、Amazon CloudWatch自体の説明

背景

ペパボのCDC

ペパボでは、ニアリアルタイムなデータ分析を目的に、各サービスのデータベースに対してCDCを実施しています。
CDCとは、データベースに生じたデータの変更を捕捉することです。

ペパボではCDCを実現する手段としてOSSのDebezium Serverを使用しています。
この技術選定の詳細については過去の記事Debezium ServerによるChange Data Captureの事例紹介をお読みください。

Debezium ServerはECS on EC2でホストされています。CDCログの送信先はCloud Pub/Sub Topicです。

CDCログの経路図

課題 - データ反映遅延

さて、「ニアリアルタイム」と謳っている以上、そのデータ鮮度は維持したい指標です。

以前、なぜかわからないけどCDCログの処理が遅延したことがありました。経路全体を見てDebezium Serverがボトルネックであると判断した後、さまざまなパラメータをいじっているうちになぜかわからないけど直りました。

Debezium Serverコンポーネント内での原因切り分けが難しいと、それだけデータ反映遅延の復旧が難しくなります。そこで、集められるテレメトリーを集めて探索可能にする仕組みの整備を決定しました。

テレメトリーの収集

今回のDebezium Serverのテレメトリー収集には、次の技術を使いました。

計装と出力はOpenTelemetryのJava Agentにさせます。間にOTel Collectorを立たせます。最終的にCloudWatchに保存し、CloudWatch上でクエリできるようにします。

それぞれ詳しく説明していきます。

Java Agent

OpenTelemetry用のJava Agentを有効化することによって、JVMで動くDebezium Serverの内部状態を計装できます。

たとえば次のようなDockerfileを書くと、JVMに関するメトリクスやトレースが収集されます。

FROM quay.io/debezium/server:3.1.3.Final
COPY ./conf/application.properties /debezium/config/application.properties

USER root
RUN curl -L -o /opt/opentelemetry-javaagent.jar \
    https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/download/v2.18.1/opentelemetry-javaagent.jar && \
    chown jboss /opt/opentelemetry-javaagent.jar && \
    chgrp jboss /opt/opentelemetry-javaagent.jar

ENV JAVA_OPTS="-javaagent:/opt/opentelemetry-javaagent.jar"

USER jboss

なぜこれで機能するのか、一つずつ追っておきましょう。 まず、Debezium ServerはJAVA_OPTS環境変数の値をプロセス起動時に渡します。

https://github.com/debezium/debezium-server/blob/3267fd97f361a77258f84df1079255bd3818573f/debezium-server-dist/src/main/resources/distro/run.sh#L53

JVMで動くプログラムは、-javaagent引数で渡されたエージェントプログラムによって計装されます。

https://docs.oracle.com/en/java/javase/17/docs/specs/man/java.html

-javaagent:jarpath[=options] Loads the specified Java programming language agent. See java.lang.instrument.

https://docs.oracle.com/en/java/javase/17/docs/api/java.instrument/java/lang/instrument/package-summary.html

Provides services that allow Java programming language agents to instrument programs running on the JVM.

OpenTelemetry用のエージェントもあります。

https://opentelemetry.io/docs/zero-code/java/agent/

Debezium Server固有のメトリクス

Debezium Serverはそのソースごとに固有のメトリクスを計装します。

https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-monitoring

このようなJMXメトリクスを収集するためには、専用の設定が必要です。

# ./conf/jmx-metrics.yml
# "MilliSecondsBehindSource"というメトリクスをGAUGE型の"debezium.source.lag.ms"として収集する
rules:
  - bean: debezium.mysql:type=connector-metrics,context=streaming,server=*
    mapping:
      MilliSecondsBehindSource:
        metric: debezium.source.lag.ms
        type: gauge
        desc: Replication lag in milliseconds
        unit: ms
FROM quay.io/debezium/server:3.1.3.Final
COPY ./conf/application.properties /debezium/config/application.properties
COPY ./conf/jmx-metrics.yml /opt/jmx-metrics.yml

USER root
RUN curl -L -o /opt/opentelemetry-javaagent.jar \
    https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/download/v2.18.1/opentelemetry-javaagent.jar && \
    chown jboss /opt/opentelemetry-javaagent.jar && \
    chgrp jboss /opt/opentelemetry-javaagent.jar && \
    chown jboss /opt/jmx-metrics.yml && \
    chgrp jboss /opt/jmx-metrics.yml

ENV JAVA_OPTS="-javaagent:/opt/opentelemetry-javaagent.jar -Dotel.jmx.config=/opt/jmx-metrics.yml"

USER jboss

OTel Collector

Java Agentがテレメトリーを収集したら、OTel Collector経由でCloudWatchに送信させていきます。オブザーバビリティバックエンドとしてCloudWatchを選択した理由は、Debezium ServerがECS on EC2、つまりAWS上で動いていたためです。権限付与方法や実際のトラブルシュートの手間を考慮すると、同一クラウド上で完結するメリットが、他のオブザーバビリティサービスを使うメリットよりも大きいと判断しました。

ECS TaskのサイドカーコンテナとしてOTel Collectorを走らせます。 今回は、CloudWatchにデータを溜めたかったため、AWS Distro for OpenTelemetryを使用しました。

https://docs.aws.amazon.com/ja_jp/xray/latest/devguide/xray-services-adot.html

public.ecr.aws/aws-observability/aws-otel-collector:v0.43.3でpullできるイメージを使用しています。

設定ファイルは次のようにしました。

receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

processors:
  batch:
    timeout: 1s
    send_batch_size: 50
  # 今回はdebeziumが計装するメトリクスと、gRPC関連のメトリクスだけを収集したい
  filter/metrics:
    metrics:
      include:
        match_type: regexp
        metric_names:
          - '^debezium\..*'
          - '^rpc\..*'
  resource:
    attributes:
      - key: aws.xray.account_id
        from_attribute: aws.account_id
        action: insert
      - key: aws.xray.region
        value: ${AWS_REGION}
        action: insert

exporters:
  # AWS X-Rayはトレースデータを保存、クエリできるサービス
  awsxray:
    region: ${AWS_REGION}
    no_verify_ssl: false
  # AWS EMF(Embedded Metrics)は構造化ログ経由でカスタムメトリクスを保存できるサービス
  awsemf:
    region: ${AWS_REGION}
    namespace: 'DebeziumServer'
    dimension_rollup_option: NoDimensionRollup
    log_retention: 14
    resource_to_telemetry_conversion:
      enabled: true
    metric_declarations:
      - dimensions: [[service.name, environment]]
        metric_name_selectors:
          - '^debezium\..*'
          - '^rpc\..*'

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch, resource]
      exporters: [awsxray]
    metrics:
      receivers: [otlp]
      processors: [batch, resource, filter/metrics]
      exporters: [awsemf]

あとは、Debezium Serverのコンテナに次の環境変数を与えるだけです。

OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4318"
OTEL_TRACES_EXPORTER="otlp"
OTEL_METRICS_EXPORTER="otlp"
OTEL_LOGS_EXPORTER="none" # すでにCloudWatch Logsで収集できているので明示的にnoneにしておく
OTEL_RESOURCE_ATTRIBUTES="service.name=debezium-server,environment=production"

これで、Java Agentが収集したテレメトリーが、OTel Collector経由でCloudWatchに保存され、探索可能になります。

メトリクスは次のように可視化できます。今回は、MySQLインスタンスへの同期遅延の時間(ミリ秒)と、Cloud Pub/Sub Topicへの送信にかかる時間(ミリ秒)を取得しています。それらを積み上げたグラフに可視化することで、CDCログの抽出時点でのデータ反映遅延時間を推定できます。この可視化では、Debezium Serverが何度か再起動するたびに、同期遅延時間と送信時間に変化があることがわかります。

メトリクスのスクリーンショット

トレースは次のように探索できます。特定時間範囲におけるレイテンシの分布を見たり、トレース一つずつのエラーログを含む詳細を取得できます。

トレースのスクリーンショット トレース詳細のスクリーンショット

課題

本当は、テーブルごとに、アプリケーションでのCDCデータ処理からCloud Pub/Subへのリクエストのようなトレースデータを収集したかったです。しかし、内部で非同期処理が挟まっており、かつそのようなコンテキストの伝播を含めた計装処理はDebezium Serverがサポートしていないらしいため、今回は断念しました。

Debezium Serverでなく、Kafka Connectorとして機能するDebeziumではそのような分散トレースがサポートされているようなので、リアーキテクチャすることがあったらこのトレース収集を検討しようと思います。

おわり

以上、Debezium ServerによるCDCログ抽出をOpenTelemetryで可観測にする方法でした。ペパボでは、このように収集したストリーミングデータの事業への活用を進めています。データエンジニアリングや機械学習に興味のある方はぜひお声がけください!!!!!!!!