DynamoDB と Amazon EventBridge の統合 - Amazon DynamoDB

DynamoDB と Amazon EventBridge の統合

Amazon DynamoDB は、変更データキャプチャのための DynamoDB Streams を提供し、DynamoDB テーブルでの項目レベルの変更のキャプチャを行うことができます。DynamoDB Streams は Lambda 関数を呼び出してこれらの変更を処理することができるため、他のサービスやアプリケーションとのイベント駆動型の統合が可能になります。DynamoDB Streams はフィルタリングもサポートしているため、効率的でターゲットを絞ったイベント処理が可能になります。

DynamoDB Streams は、各シャードで最大 2 つの同時コンシューマーをサポートし、Lambda イベントフィルタリングによるフィルタリングをサポートして、特定の条件に一致する項目のみが処理されるようにします。一部のお客様は、3 つ以上のコンシューマーをサポートする要件がある場合があります。また他のお客様は、変更イベントを処理する前にエンリッチメントが必要な場合や、より高度なフィルタリングとルーティングが必要な場合もあります。

DynamoDB を EventBridge を統合することで、これらの要件をサポートできます。

Amazon EventBridge は、イベントを使用してアプリケーションコンポーネント同士を接続するサーバーレスサービスです。これにより、スケーラブルなイベント駆動型アプリケーションを簡単に構築できます。EventBridge は、EventBridge Pipes を介して Amazon DynamoDB とネイティブに統合されるため、DynamoDB から EventBridge バスへのシームレスなデータフローが可能になります。その後、バスは一連のルールとターゲットを通じて複数のアプリケーションやサービスにファンアウトできます。

仕組み

DynamoDB パイプと EventBridge パイプの統合では、DynamoDB Streams を使用して、DynamoDB テーブル内の項目レベルの変更の時系列シーケンスをキャプチャします。この方法でキャプチャされた各レコードには、テーブルで変更されたデータが含まれます。

DynamoDB Streams と Amazon EventBridge バスの統合方法を示す画像。

EventBridge パイプは DynamoDB Streams からのイベントを消費し、EventBridge バスなどのターゲットにルーティングします (イベントバスはイベントを受信して送信先に配信するルーターで、ターゲットとも呼ばれます)。配信は、イベントの内容に一致するルールに基づいています。このパイプには、特定のイベントをフィルタリングするオプションや、ターゲットに送信される前のイベントデータのエンリッチメントオプションも含まれています。

EventBridge は複数のターゲットタイプをサポートしていますが、ファンアウト設計を実装する場合の一般的な選択肢は、ターゲットとして Lambda 関数を使用することです。次の例では、Lambda 関数ターゲットとの統合の例を示します。

コンソールを使用した統合の作成

AWS Management Console を使用して統合を作成するには、以下の手順に従います。

  1. DynamoDB デベロッパーガイドの「ストリームの有効化」セクションの手順に従って、ソーステーブルで DynamoDB Streams を有効にします。ソーステーブルで DynamoDB Streams がすでに有効になっている場合は、コンシューマーの数が 2 つ以下であることを確認します。コンシューマーには、Lambda 関数、DynamoDB グローバルテーブル、Amazon DynamoDB と Amazon OpenSearch Service のゼロ ETL 統合、または DynamoDB Streams Kinesis Adapter などのストリームから直接読み取るアプリケーションなどが含まれます。

  2. 「EventBridge ユーザーガイド」の「Creating an Amazon EventBridge event bus」セクションの手順に従って EventBridge イベントバスを作成します。

    1. イベントバスを作成する際は、[スキーマ検出] を有効化します。

  3. 「EventBridge ユーザーガイド」の「Creating an Amazon EventBridge pipe」セクションの手順に従って EventBridge パイプを作成します。

    1. ソースの設定では、[ソース] フィールドで [DynamoDB] を選択し、[DynamoDB Streams] フィールドでソーステーブルストリームの名前を選択します。

    2. ターゲットの設定では、[ターゲットサービス] フィールドで [EventBridge イベントバス] を選択し、[ターゲットのイベントバス] フィールドで手順 2 で作成したイベントバスを選択します。

  4. ソース DynamoDB テーブルにサンプル項目を書き込み、イベントをトリガーします。これにより、EventBridge はサンプル項目からスキーマを推測できます。このスキーマを使用して、イベントをルーティングするためのルールを作成できます。例えば、属性のオーバーロードを伴う設計パターンを実装する場合、ソートキーの値に応じて異なるルールをトリガーできます。DynamoDB への項目の書き込み方法の詳細については、「DynamoDB デベロッパーガイド」の「項目と属性の操作」を参照してください。

  5. 「Lambda 開発者ガイド」の「Python による Lambda 関数の構築」セクションの手順に従ってターゲットとして使用する Python Lambda 関数の例を作成します。関数を作成する際は、以下のサンプルコードを使用して統合のデモを行えます。コードが呼び出されると、イベントで受信した NewImageOldImage を表示し、これは CloudWatch Logs で確認することができます。

    import json def lambda_handler(event, context): dynamodb = event.get('detail', {}).get('dynamodb', {}) new_image = dynamodb.get('NewImage') old_image = dynamodb.get('OldImage') if new_image: print("NewImage:", json.dumps(new_image, indent=2)) if old_image: print("OldImage:", json.dumps(old_image, indent=2)) return {'statusCode': 200, 'body': json.dumps(event)}
  6. 「EventBridge ユーザーガイド」の「Creating Amazon EventBridge rules that react to events」セクションの手順に従って、新しい Lambda 関数にイベントをルーティングする EventBridge ルールを作成します。

    1. ルールの詳細を定義する際は、手順 2 で作成したイベントバスの名前を [イベントバス] として選択します。

    2. イベントパターンを構築する際は、既存のスキーマの指示に従います。ここでは、discovered-schemas レジストリおよびイベントで検出されたスキーマを選択できます。これにより、特定の属性に一致するメッセージのみをルーティングするユースケースに固有のイベントパターンを設定できます。例えば、SK が “user#” で始まる DynamoDB 項目のみを一致させる場合は、次のような設定を使用します。

      「user#」で始まるソートキーを持つ DynamoDB 項目のみが表示される EventBridge ルールを示す画像。
    3. スキーマに対するパターンの設計が完了したら、[JSON でイベントパターンを生成する] をクリックします。代わりに DynamoDB Streams に表示されるすべてのイベントにマッチさせる場合は、イベントパターンに次の JSON を使用します。

      { "source": ["aws.dynamodb"] }
    4. ターゲットを選択する際は、AWS サービスの指示に従います。[ターゲットの選択] フィールドで [Lambda 関数] を選択します。[関数] フィールドで、手順 5 で作成した Lambda 関数を選択します。

  7. これで、「EventBridge ユーザーガイド」の「Starting or stopping schema discovery on event buses」セクションの手順に従って、イベントバスでのスキーマの検出を停止することができるようになります。

  8. ソース DynamoDB テーブルに 2 つ目のサンプル項目を書き込み、イベントをトリガーします。イベントが各ステップで正常に処理されたことを確認します。

    1. 「EventBridge ユーザーガイド」の「Monitoring Amazon EventBridge」セクションに従ってイベントバスの CloudWatch メトリクス PutEventsApproximateSuccessCount を確認します。

    2. 「Lambda 開発者ガイド」の「Lambda 関数のモニタリングおよびトラブルシューティング」に従って、Lambda 関数の関数ログを確認します。Lambda 関数で提供されているサンプルコードを使用している場合は、CloudWatch Logs ロググループに DynamoDB Streams の NewImageOldImage が出力されます。

    3. 「Lambda 開発者ガイド」の「Lambda 関数のモニタリングおよびトラブルシューティング」に従って、Lambda 関数のエラー数および成功率 (%) メトリクスを確認します。

次のステップ

これは、単一の Lambda 関数をターゲットとする基本的な統合の例です。複数のルールの作成、複数のターゲットの作成、他のサービスとの統合、イベントのエンリッチメントなど、より複雑な設定をさらに理解するには、EventBridge ユーザーガイド「Getting started with EventBridge」を参照してください。

注記

アプリケーションに関連する可能性のある EventBridge クォータに注意してください。DynamoDB Streams のキャパシティはテーブルに合わせてスケーリングされますが、EventBridge のクォータは別のものです。大規模なアプリケーションで注意すべき一般的なクォータは、1 秒あたりのトランザクションの呼び出しスロットリング制限1 秒あたりのトランザクションの PutEvents スロットリング制限です。これらのクォータは、ターゲットに送信できる呼び出しの数と、1 秒あたりにバスに送信できるイベントの数を指定します。