Pipes のソースとしての Amazon Kinesis EventBridge ストリーム - Amazon EventBridge

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Pipes のソースとしての Amazon Kinesis EventBridge ストリーム

EventBridge Pipes を使用して、Kinesis データストリームでレコードを受信できます。その後、オプションでこれらのレコードをフィルタリングまたは拡張してから、処理可能な送信先のいずれかに送信できます。パイプをセットアップするときに選択できる Kinesis 固有の設定があります。 EventBridge Pipes は、データを送信先に送信するときに、データストリームからのレコードの順序を維持します。

Kinesis データストリームは、シャードのセットです。各シャードには、一連のデータレコードが含まれます。コンシューマーは、Kinesis データストリームからのデータを処理するアプリケーションです。 EventBridge Pipe は、共有スループットコンシューマー (標準イテレーター)、または拡張ファンアウト を備えた専用スループットコンシューマーにマッピングできます。

標準イテレーターの場合、 HTTPプロトコル EventBridge を使用して Kinesis ストリーム内の各シャードにレコードをポーリングします。このパイプでは、シャードの他のコンシューマーと読み取りスループットを共有します。

レイテンシーを最小限に抑え、読み取りスループットを最大化するために、拡張ファンアウトを使用するデータストリームコンシューマーを作成できます。ストリームコンシューマーは、ストリームから読み取る他のアプリケーションに影響を及ぼさないように、専用の接続を各シャードに割り当てます。専用のスループットは、多数のアプリケーションで同じデータを読み取っている場合や、大きなレコードでストリームを再処理する場合に役立ちます。Kinesis は HTTP/2 EventBridge 経由でレコードを にプッシュします。Kinesis Data Streams の詳細については、「Amazon Kinesis Data Streams からのデータの読み取り」を参照してください。

イベントの例

次のサンプルイベントは、パイプが受信した情報を示しています。このイベントを使用して、イベントパターンを作成およびフィルタリングしたり、入力変換を定義したりできます。すべてのフィールドをフィルタリングできるわけではありません。フィルターできるフィールドの詳細については、「Amazon EventBridge Pipes でのイベントフィルタリング」を参照してください。

[ { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ]

ポーリングストリームとバッチストリーム

EventBridge は、Kinesis ストリーム内のシャードを 1 秒あたり 4 回の基本レートでレコードに対してポーリングします。レコードが使用可能になると、 はイベント EventBridge を処理し、結果を待ちます。処理が成功すると、 はより多くのレコードを受信するまでポーリング EventBridge を再開します。

デフォルトでは、 はレコードが利用可能になるとすぐにパイプを EventBridge 呼び出します。ソースから EventBridge 読み取るバッチにレコードが 1 つだけ含まれている場合、処理されるイベントは 1 つだけです。少数のレコードを処理しないようにするには、バッチ処理ウィンドウを設定して、最大 5 分間レコードをバッファリングするようにパイプに指示できます。イベントを処理する前に、 は、完全なバッチが収集されるか、バッチ処理ウィンドウが期限切れになるか、バッチがペイロード制限の 6 MB に達するまで、ソースからのレコードの読み取り EventBridge を続行します。

また、各シャードから複数のバッチを並行して処理することで、並行性を高めることもできます。 EventBridge は、各シャードで最大 10 個のバッチを同時に処理できます。シャードあたりの同時バッチ数を増やすと、 EventBridge ストリルはパーティションキーレベルでの順序付けられた処理を確実にします。

ParallelizationFactor 設定を使用することで、複数のパイプの同時実行により、Kinesis または DynamoDB データストリームの 1 つのシャードを処理します。1 (デフォルト) から 10 までの並列化係数を使用して、シャードから EventBridge ポーリングする同時バッチの数を指定できます。例えば、 ParallelizationFactorを 2 に設定すると、最大 200 回の EventBridge 同時パイプ実行で 100 個の Kinesis データシャードを処理できます。これにより、データボリュームが揮発性で IteratorAge が高いときに処理のスループットをスケールアップすることができます。Kinesis 集約を使用している場合、並列化係数は機能しません。

ポーリングとストリームの開始位置

パイプの作成時と更新時のストリームソースポーリングは、最終的に一貫性があることに注意してください。

  • パイプ作成中、ストリームからのイベントのポーリングが開始されるまでに数分かかること場合があります。

  • ソースのポーリング構成をパイプで更新している間、ストリームのポーリングイベントを停止して再開するまでに数分かかることがあります。

つまり、LATEST をストリームの開始位置として指定すると、パイプ作成または更新中に送信されるイベントをパイプが見逃す可能性があります。イベントを見逃さないようにするには、ストリームの開始位置を TRIM_HORIZON または AT_TIMESTAMP として指定します。

バッチ項目の失敗の報告

がソースからのストリーミングデータを EventBridge 消費して処理する場合、デフォルトではバッチのシーケンス番号が最も高いチェックポイントになりますが、バッチが完全に成功した場合に限ります。正常に処理されたメッセージが失敗したバッチで再処理されないようにするには、成功したメッセージと失敗したメッセージを示すオブジェクトを返すようにエンリッチメントまたはターゲットを設定できます。これを部分的なバッチレスポンスと呼びます。

詳細については、「部分的なバッチ処理失敗」を参照してください。

成功条件と失敗の条件

次のいずれかを返すと、 はバッチを完全に成功として EventBridge 処理します。

  • 空の batchItemFailure リスト

  • null の batchItemFailure リスト

  • 空の EventResponse

  • null の EventResponse

次のいずれかを返すと、 はバッチを完全に失敗として EventBridge 処理します。

  • 空の文字列 itemIdentifier

  • ヌル itemIdentifier

  • 不正なキー名を持つ itemIdentifier

EventBridge は、再試行戦略に基づいて失敗を再試行します。