翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Pipes のソースとしての Amazon DynamoDB EventBridge ストリーム
EventBridge Pipes を使用して、DynamoDB ストリームでレコードを受信できます。その後、オプションでこれらのレコードをフィルタリングまたは拡張してから、ターゲットに送信して処理できます。パイプを設定するときに選択できる Amazon DynamoDB Streams 固有の設定があります。 EventBridge Pipes は、そのデータを宛先に送信するときに、データストリームからのレコードの順序を維持します。
重要
パイプのソースである DynamoDB ストリームを無効にすると、ストリームを再度有効にしても、そのパイプは使用できなくなります。この理由は、以下のとおりです。
ソースが無効になっているパイプを停止、開始、または更新することはできません。
作成後のパイプを新しいソースで更新することはできません。DynamoDB ストリームを再度有効にすると、そのストリームには新しい Amazon リソースネーム (ARN) が割り当てられ、パイプに関連付けられなくなります。
DynamoDB ストリームを再度有効にする場合は、ストリームの新しい を使用して新しいパイプを作成する必要がありますARN。
イベントの例
次のサンプルイベントは、パイプが受信した情報を示しています。このイベントを使用して、イベントパターンを作成およびフィルタリングしたり、入力変換を定義したりできます。すべてのフィールドをフィルタリングできるわけではありません。フィルターできるフィールドの詳細については、「Amazon EventBridge Pipes でのイベントフィルタリング」を参照してください。
[ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": "arn:aws:dynamodb:us-east-1:111122223333:table/EventSourceTable", "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": "arn:aws:dynamodb:us-east-1:111122223333:table/EventSourceTable", "eventSource": "aws:dynamodb" } ]
ポーリングストリームとバッチストリーム
EventBridge は、DynamoDB ストリーム内のシャードを 1 秒あたり 4 回の基本レートでレコードにポーリングします。レコードが使用可能になると、 はイベント EventBridge を処理し、結果を待ちます。処理が成功すると、 はより多くのレコードを受信するまでポーリング EventBridge を再開します。
デフォルトでは、 はレコードが利用可能になるとすぐにパイプを EventBridge 呼び出します。ソースから EventBridge 読み取るバッチにレコードが 1 つだけ含まれている場合、処理されるイベントは 1 つだけです。少数のレコードを処理しないようにするには、バッチ処理ウィンドウを設定して、最大 5 分間レコードをバッファリングするようにパイプに指示できます。イベントを処理する前に、 は、完全なバッチが収集されるか、バッチ処理ウィンドウが期限切れになるか、バッチがペイロード制限の 6 MB に達するまで、ソースからのレコードの読み取り EventBridge を続行します。
また、各シャードから複数のバッチを並行して処理することで、並行性を高めることもできます。 EventBridge は、各シャードで最大 10 個のバッチを同時に処理できます。シャードあたりの同時バッチ数を増やすと、 EventBridge ストリルはパーティションキーレベルでの順序付けられた処理を確実にします。
ParallelizationFactor
設定を使用することで、複数のパイプの同時実行により、Kinesis または DynamoDB データストリームの 1 つのシャードを処理します。1 (デフォルト) から 10 までの並列化係数を使用して、シャードから EventBridge ポーリングする同時バッチの数を指定できます。例えば、 ParallelizationFactor
を 2 に設定すると、最大 200 回の EventBridge 同時パイプ実行で 100 個の Kinesis データシャードを処理できます。これにより、データボリュームが揮発性で IteratorAge
が高いときに処理のスループットをスケールアップすることができます。Kinesis 集約を使用している場合、並列化係数は機能しません。
ポーリングとストリームの開始位置
パイプの作成時と更新時のストリームソースポーリングは、最終的に一貫性があることに注意してください。
パイプ作成中、ストリームからのイベントのポーリングが開始されるまでに数分かかること場合があります。
ソースのポーリング構成をパイプで更新している間、ストリームのポーリングイベントを停止して再開するまでに数分かかることがあります。
つまり、LATEST
をストリームの開始位置として指定すると、パイプ作成または更新中に送信されるイベントをパイプが見逃す可能性があります。イベントを見逃さないようにするには、ストリームの開始位置を TRIM_HORIZON
として指定します。
バッチ項目の失敗の報告
がソースからのストリーミングデータを EventBridge 消費して処理する場合、デフォルトではバッチのシーケンス番号が最も高いチェックポイントになりますが、バッチが完全に成功した場合に限ります。正常に処理されたメッセージが失敗したバッチで再処理されないようにするには、成功したメッセージと失敗したメッセージを示すオブジェクトを返すようにエンリッチメントまたはターゲットを設定できます。これを部分的なバッチレスポンスと呼びます。
詳細については、「部分的なバッチ処理失敗」を参照してください。
成功条件と失敗の条件
次のいずれかを返すと、 はバッチを完全に成功として EventBridge 処理します。
空の
batchItemFailure
リストnull の
batchItemFailure
リスト空の
EventResponse
null の
EventResponse
次のいずれかを返すと、 はバッチを完全に失敗として EventBridge 処理します。
空の文字列
itemIdentifier
ヌル
itemIdentifier
不正なキー名を持つ
itemIdentifier
EventBridge は、再試行戦略に基づいて失敗を再試行します。