重複レコードを処理する - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

重複レコードを処理する

レコードが複数回 Amazon Kinesis Data Streams アプリケーションに配信される理由は、主にプロデューサーの再試行とコンシューマーの再試行の 2 つになります。アプリケーションは、個々のレコードを複数回処理することを見込んで、適切に処理する必要があります。

プロデューサーの再試行

プロデューサーで PutRecord を呼び出してから Amazon Kinesis Data Streams の受信確認を受け取るまでの間に、ネットワーク関連のタイムアウトが発生する場合があります。この場合、プロデューサーはレコードが Kinesis Data Streams に配信されたかどうかを確認できません。各レコードがアプリケーションにとって重要であれば、同じデータを使用して呼び出しを再試行するようにプロデューサーが定義されているはずです。同じデータを使用した PutRecord の呼び出しが両方とも Kinesis Data Streams に正常にコミットされると、Kinesis Data Streams レコードは 2 つになります。2 つのレコードには同一のデータがありますが、一意のシーケンス番号も付けられています。厳密な保証を必要とするアプリケーションは、レコード内にプライマリキーを埋め込んで、後ほど処理するときに重複を削除する必要があります。プロデューサーの再試行に起因する重複の数が、コンシューマーの再試行に起因する重複の数より通常は少ないことに注意してください。

注記

AWS SDK PutRecord を使用する場合は、「AWS SDK およびツールユーザーガイド」で SDK 再試行動作について説明します。

コンシューマーの再試行

コンシューマー (データ処理アプリケーション) の再試行は、レコードプロセッサが再開するときに発生します。同じシャードのレコードプロセッサは、次の場合に再開します。

  1. ワーカーが予期せず終了する

  2. ワーカーインスタンスが追加または削除される

  3. シャードがマージまたは分割される

  4. アプリケーションがデプロイされる

これらすべての場合において、shards-to-worker-to-record-processor マッピングは、処理の負荷を分散するために継続的に更新されます。他のインスタンスに移行されたシャードプロセッサは、最後のチェックポイントからレコードの処理を再開します。これにより、以下の例にあるような重複レコード処理が発生します。負荷分散の詳細については、シャードの数を変更するには、再シャーディング、スケーリング、並列処理を使用します。を参照してください。

例: コンシューマーの再試行によるレコードの再配信

この例では、ストリームから継続的にレコードを読み取り、ローカルファイルにレコードを集約し、このファイルを Amazon S3 にアップロードするアプリケーションがあるとします。分かりやすくするため、1 つのシャードと、このシャードを処理する 1 つのワーカーのみがあるとします。最後のチェックポイントがレコード番号 10,000 であると仮定して、次の例の一連のイベントを考えてみます。

  1. ワーカーで、シャードから次のレコードのバッチを読み込みます (1,0001 から 20000)。

  2. 次に、ワーカーがレコードのバッチを関連付けられたレコードプロセッサに渡します。

  3. レコードプロセッサはデータを集約し、Amazon S3 ファイルを作成して、このファイルを Amazon S3 に正常にアップロードします。

  4. 新しいチェックポイントが生成される前に、ワーカーが予期せず終了します。

  5. アプリケーション、ワーカー、およびレコードプロセッサが再開します。

  6. ワーカーは、正常な最後のチェックポイント (この場合は 1,0001) から読み込みを開始しました。

したがって、1,0001 から 20000 のレコードは複数回使用されます。

コンシューマーの再試行に対する弾力性

レコードが複数回処理される可能性はあるものの、アプリケーションでは、レコードが 1 回だけ処理されたかのような付随効果 (冪等処理) を提示する場合があります。この問題に対するソリューションは、複雑性と正確性に応じて異なります。最終的なデータの送信先が重複を適切に処理できる場合は、冪等処理の実行は最終送信先に任せることをお勧めします。例えば、Opensearch では、バージョニングと一意の ID の組み合わせを使用して重複処理を防ぐことができます。

前セクションのアプリケーション例では、ストリームから継続的にレコードを読み取り、レコードをローカルファイルに集約して、ファイルを Amazon S3 にアップロードします。図に示すように、1,0001 から 20000 のレコードが複数回使用されることにより、複数の Amazon S3 ファイルのデータは同じになります。この例からの重複を軽減する方法の 1 つは、ステップ 3 での次のスキーマの使用を確実にすることです。

  1. レコードプロセッサは、各 Amazon S3 ファイルに固定のレコード番号 (5000 など) を使用します。

  2. ファイル名には、このスキーマ (Amazon S3 プレフィックス、シャード ID、および First-Sequence-Num) を使用します。この場合は、sample-shard000001-10001 のようになります。

  3. Amazon S3 ファイルをアップロードした後で、Last-Sequence-Num を指定してチェックポイントを作成します。この場合は、レコード番号 15000 にチェックポイントが作成されます。

このスキーマを使用すると、レコードが複数回処理されても、Amazon S3 ファイルには同じ名前と同じデータが保持されます。再試行しても、同じファイルに同じデータが複数回書き込まれるだけになります。

リシャーディング操作の場合は、シャードに残っているレコードの数が必要な一定数よりも少ないことがあります。この場合、shutdown() メソッドは Amazon S3 にファイルをフラッシュし、最後のシーケンス番号でチェックポイントを作成する必要があります。上記のスキーマは、リシャーディング操作との互換性もあります。