minne 動画 AWS パブリッククラウド Shoryuken

Rails × AWS × Shoryukenで実現する大規模動画処理システム

minne 動画 AWS パブリッククラウド Shoryuken
  1. はじめに
  2. セキュアなアップロードとS3の活用
    1. 1. S3バケットの作成と設定
    2. 2. 署名付きURLの生成
    3. 3. クライアントサイドの実装
  3. ShoryukenとSQSによる非同期ジョブ処理
    1. 1. Shoryukenとは
    2. 2. セキュリティスキャンワーカーの実装
    3. 3. 動画変換ワーカーの実装
    4. 4. エラーハンドリング
  4. Lambdaによるワークフロー制御
  5. まとめと今後の展望

はじめに

こんにちは!GMOペパボでエンジニアをしている@yumuです。

近年、動画コンテンツの重要性は急速に高まっており、多くのウェブサービスやアプリケーションで動画のアップロードと共有が一般的になっています。しかし、動画ファイルの取り扱いには、セキュリティ、ファイルサイズ、形式の互換性など、さまざまな技術的課題があります。

今回、私はハンドメイドECサービス「minne」において、作家が作品に関する動画をアップロードできる機能を開発しました。アップロードされた動画を効率よく配信するためには、動画を適切なフォーマットに変換・圧縮する必要があります。動画処理システムを実装するにあたり、AWSの各種サービスとRuby用のバックグラウンドジョブフレームワークであるShoryukenを組み合わせたアーキテクチャを採用しました。

システム全体図

具体的には、Amazon S3を動画ストレージとして使用し、Amazon SQSでタスクキューを管理します。AWS Lambdaでワークフロー全体を制御し、AWS EKS上で動作するShoryukenワーカーでセキュリティスキャンと動画変換処理を実行します。ユーザーは署名付きURLを用いて動画をS3に直接アップロードすることができるようになっています。

この記事では、システムの詳細な設計と実装について、具体的なコードや設定例を交えながら説明していきます。

セキュアなアップロードとS3の活用

システムの構築において、最初に取り組んだのはセキュアで効率的なアップロード機能の実装です。従来のアプローチでは、クライアントからアプリケーションサーバーを経由してストレージにファイルをアップロードする方法が一般的でした。しかし、この方法には以下のような課題があります。

  • サーバー負荷:大容量の動画ファイルをアップロードする際、アプリケーションサーバーに大きな負荷がかかります
  • 通信コスト:アプリケーションサーバーを経由することで、不必要な通信量と料金が発生します
  • セキュリティリスク:セキュリティスキャンが済んでいないデータがアプリケーションサーバーを通過することになり、潜在的なセキュリティリスクとなります

そこで、S3へのダイレクトアップロードを実装することで、これらの問題を解決しました。具体的な実装手順は以下の通りです。

1. S3バケットの作成と設定

まず、動画ファイルを保存するためのS3バケットを環境ごとに作成しました。このバケットに対して適切なCORS設定を行い、特定のオリジンからのアクセスを許可します。

[
  {
    "AllowedMethods": [
      "PUT"
    ],
    "AllowedOrigins": [
      "https://minne.com"
    ],
    "ExposeHeaders": []
  }
]

2. 署名付きURLの生成

セキュリティを確保しつつ、クライアントからS3へのダイレクトアップロードを可能にするため、署名付きURLを使用します。アプリケーションサーバーで以下のようなコードで署名付きURLを生成しています。

def generate_direct_upload_url(file_params)
  # ActiveStorage::Blobを作成
  blob = ActiveStorage::Blob.create_before_direct_upload!(
    filename: file_params[:filename],
    byte_size: file_params[:byte_size],
    checksum: file_params[:checksum],
    content_type: file_params[:content_type],
    service_name: 'amazon' # または適切なサービス名
  )

  # 署名付きURLとヘッダー情報を生成
  {
    signed_id: blob.signed_id,
    direct_upload_url: blob.service_url_for_direct_upload,
    direct_upload_headers: blob.service_headers_for_direct_upload.to_json
  }
end

# 使用例
file_params = {
  filename: 'example.mp4',
  byte_size: 1024000,
  checksum: 'checksum_value',
  content_type: 'video/mp4'
}

upload_data = generate_direct_upload_url(file_params)

3. クライアントサイドの実装

クライアントサイドでは、アプリケーションサーバーで生成された署名付きURLを受け取り、S3に直接ファイルをアップロードします。

async function uploadFileToS3(directUploadUrl, directUploadHeaders, file) {
  const headers = JSON.parse(directUploadHeaders);
  
  try {
    const response = await fetch(directUploadUrl, {
      method: 'PUT',
      headers: headers,
      body: file
    });

    if (!response.ok) {
      throw new Error('Upload failed');
    }

    return 'File uploaded successfully';
  } catch (error) {
    console.error('Upload error:', error);
    throw error;
  }
}

// 使用例
const directUploadUrl = 'https://your-s3-bucket.s3.amazonaws.com/path/to/file';
const directUploadHeaders = '{"Content-Type": "video/mp4"}';
const fileToUpload = new File([''], 'example.mp4', { type: 'video/mp4' });

uploadFileToS3(directUploadUrl, directUploadHeaders, fileToUpload)
  .then(result => console.log(result))
  .catch(error => console.error('Upload failed:', error));

この実装により、大容量のファイルでもアプリケーションサーバーに負荷をかけずにアップロードすることが可能になりました。

ShoryukenとSQSによる非同期ジョブ処理

動画のアップロードが完了したら、動画ファイルに対してマルウェアの検査と変換・圧縮処理を実行します。これらのタスクは時間がかかるため、非同期で実行する必要があります。今回は非同期タスクのジョブキューとしてAWS SQSを、メッセージプロセッサーとしてShoryukenを採用しました。

1. Shoryukenとは

Shoryukenは、RubyでSQSを利用するための効率的なバックグラウンドジョブ処理システムです。以下のような特徴があります。

  • AWSサービスとの親和性: S3、Lambda等の他のAWSサービスとの連携が容易です
  • キューごとの柔軟な並列処理: 各キューに対して独立して並列数を設定できるため、タスクの特性に応じたリソース配分が可能です

Sidekiqも検討しましたが、上記の特徴が今回の動画処理システムに適していると考え、Shoryukenを採用しました。

2. セキュリティスキャンワーカーの実装

ユーザーからアップロードされたコンテンツは、セキュリティ上の理由からセキュリティスキャンが必須です。動画ファイルも例外ではありません。

今回は、clambyというgemを使用してセキュリティスキャンを行うワーカーを実装しました。Clambyは、オープンソースのアンチウイルスエンジンであるClamAVをRubyから簡単に利用するためのgemです。このワーカーは、SQSからメッセージを受け取り、指定されたS3オブジェクトをダウンロードしてセキュリティスキャンを実行します。また、その結果に基づいて、S3オブジェクトにタグを付与(例: 'virus_checked: clean' または 'virus_checked: infected')します。

clambyを利用したセキュリティスキャンの実装例:

require 'clamby'

file = '/path/to/downloaded/file.mp4'
scan_result = Clamby.safe?(file)

S3にアップロードされた動画ファイルを自動的に処理するため、S3のイベント通知機能を利用しています。S3バケットで、オブジェクト作成イベントをセキュリティスキャン用のSQSキューに送信するようにバケットプロパティを設定しました。

3. 動画変換ワーカーの実装

セキュリティスキャンを通過したファイルに対して、変換処理を行います。このワーカーもセキュリティスキャンのワーカーと同様に、SQSからメッセージを受け取り、指定されたS3オブジェクトをダウンロードして動画変換を実行します。変換後の動画を新しいS3オブジェクトとしてアップロードし、元のオブジェクトに変換完了のタグ(例: 'converted: 2024-08-01')を付与します。

動画変換にはstreamio-ffmpeg gemを使用しています。このgemは、FFmpegをRubyから簡単に利用するためのラッパーライブラリです。FFmpegは非常に柔軟で高機能な動画処理ツールであり、様々な形式の動画を扱うことができます。今回は、動画を表示する画面ごとに適したCRF(品質と圧縮率のバランスを調整するパラメーター)を指定し、2種類のMP4ファイルに変換しました。

streamio-ffmpegを使用した動画変換の実装例:

require 'streamio-ffmpeg'

file = '/path/to/downloaded/file.mp4'
movie = FFMPEG::Movie.new(file)
options = {
  video_codec: 'libx264',
  audio_codec: 'aac',
  audio_bitrate: '128k',
  custom: %w(-crf 23)
}
movie.transcode('output.mp4', options)

また、EKS上でShoryukenワーカーを運用する際、適切なリソース割り当ても重要です。ワーカーのメモリやCPUの実際の使用量を測定し、Kubernetesのリソース制限と要求を適切に設定しました。これにより、リソースの無駄を省きつつ、安定した処理を実現することができました。

4. エラーハンドリング

SQSのデッドレターキュー機能を利用して、継続的に処理に失敗するメッセージを別のキューに移動させています。また、デッドレターキューにメッセージが入るとAmazon CloudWatchとAWS Chatbotを使用してSlack通知するようにしています。この実装により、一時的なエラー(ネットワーク障害など)の場合は自動的にリトライし、継続的に失敗する場合は別途調査できるようになっています。

Lambdaによるワークフロー制御

動画のアップロード、セキュリティスキャン、変換という一連のプロセスを効率的に管理するため、Lambdaを使用してワークフロー全体を制御しています。この設計の主な目的は責務の分離です。セキュリティスキャンや動画変換のワーカーはそれぞれのタスクに専念し、Lambda関数はワークフローの調整役として、スキャン結果の確認、次のステップへの移行判断、通知の送信などを担当します。この責務分離により、システムの柔軟性と保守性が向上しています。

S3のイベント通知機能を使って、オブジェクトに対するタグ追加イベントをLambda関数にトリガーしています。

import json
import boto3

def lambda_handler(event, context):
  s3_client = boto3.client('s3')

  bucket = event['Records'][0]['s3']['bucket']['name']
  key = event['Records'][0]['s3']['object']['key']
    
  try:
    tags = s3_client.get_object_tagging(Bucket=bucket, Key=key)['TagSet']
        
    if is_infected(tags):
      # Slack通知のロジック(省略)
    elif is_clean_and_not_converted(tags):
      # SQSにメッセージを送信するロジック(省略)
    else:
      print(f"No action needed for object: {key}")
      return {'statusCode': 200, 'body': json.dumps('Processed successfully')} 
  except Exception as e:
    print(f"Error processing object {key}: {str(e)}")
    return {'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}')}

def is_infected(tags):
  return any(tag['Key'] == 'virus_checked' and tag['Value'] == 'infected' for tag in tags)

def is_clean_and_not_converted(tags):
  is_clean = any(tag['Key'] == 'AV_STATUS' and tag['Value'] == 'PASSED' for tag in tags)
  is_not_converted = not any(tag['Key'] == 'converted' for tag in tags)
  return is_clean and is_not_converted

このLambda関数は、S3オブジェクトのタグに基づいて処理の分岐を行っています。セキュリティスキャンを通過した場合は動画変換ワーカーのSQSキューにメッセージを送信し、マルウェアが検出された場合はSlackに通知を送ります。

まとめと今後の展望

今回は、AWSのサービスとShoryukenを組み合わせた動画処理システムを実装しました。今回の取り組みを通じて、以下のような知見を得ることができました。

  • AWSのマネージドサービスを適切に組み合わせることで、複雑な処理フローも効率的に実装できること
  • 非同期処理とキューイングシステムの重要性、特に長時間処理や大容量データを扱う際の有効性

このプロジェクトが始まった当初は動画処理に関する知識がほとんどない状態でしたが、周囲のエンジニアに教わりながらプロジェクトを進めてきました。半年前にminneに配属されてから初めての設計から実装・運用まで一貫して携わったプロジェクトで、無事ユーザー提供できてとても嬉しいです!

今後は、動画サムネイルの自動生成機能の実装、S3ストレージクラスの最適化によるコスト削減、OpenTelemetryを活用したジョブ実行時間の詳細な計測と分析などの改善を検討しています。