を使用してスループットを共有してカスタムコンシューマーを開発する AWS SDK for Java - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

を使用してスループットを共有してカスタムコンシューマーを開発する AWS SDK for Java

全体で共有されているカスタム Kinesis Data Streams コンシューマーを開発する方法の 1 つは、Amazon Kinesis Data Streams を使用することですAPIs。このセクションでは、 for Java APIsでの AWS SDK Kinesis Data Streams の使用について説明します。このセクションの Java サンプルコードは、基本的なKDSAPIオペレーションを実行する方法を示し、オペレーションタイプ別に論理的に分割されています。

これらのサンプルコードは、本稼働環境対応のコードではありません。考えられる例外のすべてを確認するものではなく、潜在的なセキュリティまたはパフォーマンス事項も考慮されていません。

他のプログラミング言語APIsを使用して Kinesis Data Streams を呼び出すことができます。利用可能なすべての の詳細については AWS SDKs、「Amazon Web Services での開発を開始する」を参照してください。

重要

全体で共有されているカスタム Kinesis Data Streams コンシューマーを開発するための推奨方法は、Kinesis Client Library () を使用することですKCL。KCL は、分散コンピューティングに関連する複雑なタスクの多くを処理することで、Kinesis データストリームからのデータの消費と処理を支援します。詳細については、「 を使用した共有スループットのカスタムコンシューマーの開発KCL」を参照してください。

ストリームからデータを取得する

Kinesis Data Streams には、データストリームからレコードを取得するために呼び出すことができる メソッドgetShardIteratorgetRecordsメソッドAPIsが含まれています。これはプルモデルで、コードはデータストリームのシャードからデータを直接取得します。

重要

データストリームからレコードKCLを取得するには、 が提供するレコードプロセッサのサポートを使用することをお勧めします。これは、データを処理するコードを組み込むプッシュモデルです。は、データストリームからデータレコードKCLを取得し、アプリケーションコードに配信します。さらに、 はフェイルオーバー、リカバリ、ロードバランシング機能KCLを提供します。詳細については、「 を使用した共有スループットのカスタムコンシューマーの開発KCL」を参照してください。

ただし、場合によっては Kinesis Data Streams の使用が必要になることがありますAPIs。例えば、データストリームのモニタリングやデバッグのためのカスタムツールを実装する場合です。

重要

Kinesis Data Streams は、データストリームのデータレコードの保持期間の変更をサポートしています。詳細については、「データ保持期間を変更する」を参照してください。

シャードイテレーターを使用する

レコードは、シャード単位でストリームから取得します。シャードごと、およびそのシャードから取得するレコードのバッチごとに、シャードイテレーターを取得する必要があります。シャードイテレーターは、レコードの取得元になるシャードを指定するために getRecordsRequest オブジェクトで使用します。シャードイテレーターに関連付けられるタイプによって、レコードを取得するシャード内の位置が決まります (詳細については、このセクションの後半を参照してください)。シャードイテレーターを使用する前に、シャードを取得する必要があります。詳細については、「シャードを一覧表示する」を参照してください。

最初のシャードイテレーターは、getShardIterator メソッドを使用して取得します。追加のレコードバッチのシャードイテレーターは、getNextShardIterator メソッドによって返された getRecordsResult オブジェクトの getRecords メソッドを使用して取得します。シャードイテレーターの有効時間は 5 分間です。有効時間内にシャードイテレーターを使用すると、新しいシャードイテレーターが取得されます。各シャードイテレーターは、使用後も 5 分間有効です。

最初のシャードイテレーターを取得するには、GetShardIteratorRequest をインスタンス化してから、getShardIterator メソッドに渡します。リクエストを設定するには、ストリームとシャード ID を指定します。 AWS アカウントでストリームを取得する方法については、「」を参照してくださいストリームの一覧表示。ストリーム内のシャードを取得する方法については、シャードを一覧表示するを参照してください。

String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(myStreamName); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator();

このサンプルコードは、最初のシャードイテレーターを取得するときのイテレータータイプとして TRIM_HORIZON を指定しています。このイテレーター型を指定することで、レコードはまず、シャードに直近に追加されたレコード (tip) からではなく、シャードに最初に追加されたレコードから返されます。以下は、使用可能なイテレータータイプです。

  • AT_SEQUENCE_NUMBER

  • AFTER_SEQUENCE_NUMBER

  • AT_TIMESTAMP

  • TRIM_HORIZON

  • LATEST

詳細については、「」を参照してくださいShardIteratorType

イテレータータイプによっては、タイプに加えてシーケンス番号を指定する必要があります。以下がその例です。

getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);

getRecords を使用してレコードを取得したら、レコードの getSequenceNumber メソッドを呼び出すことによって、レコードのシーケンス番号を取得できます。

record.getSequenceNumber()

さらに、データストリームにレコードを追加するコードは、getSequenceNumber の結果に対して putRecord を呼び出すことで、追加されたレコードのシーケンス番号を取得できます。

lastSequenceNumber = putRecordResult.getSequenceNumber();

シーケンス番号は、レコードの順序が厳密に増加することを保証するために使用できます。詳細については、PutRecord 例のサンプルコードを参照してください。

を使用する GetRecords

シャードイテレーターを取得したら、GetRecordsRequest オブジェクトをインスタンス化します。リクエストのイテレーターは、setShardIterator メソッドを使用して指定します。

オプションとして、setLimit メソッドを使用することで、取得するレコードの数を設定することもできます。getRecords が返すレコードの数は、常にこの制限以下になります。この制限を指定しない場合、getRecords は取得したレコードの 10 MB を返します。次のサンプルコードは、この制限を 25 レコードに設定します。

レコードが返されない場合、シャードイテレーターが参照するシーケンス番号には、このシャードから現在利用できるデータレコードが存在しないことを意味します。この状況では、アプリケーションが、ストリームのデータソースに対して適切な時間を待機する必要があります。その後、getRecords への以前のコールで返されたシャードイテレーターを使用して、シャードからのデータの取得を再試行します。

getRecordsRequest メソッドに getRecords を渡し、返された値を getRecordsResult オブジェクトとしてキャプチャします。データレコードを取得するには、getRecords オブジェクトに対して getRecordsResult メソッドを呼び出します。

GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords();

getRecords への別のコールを準備するために、getRecordsResult から次のシャードイテレーターを取得します。

shardIterator = getRecordsResult.getNextShardIterator();

最良の結果を得るには、getRecords へのコール間で少なくとも 1 秒間 (1,000 ミリ秒) スリープして、getRecords の頻度制限を超えないようにしてください。

try { Thread.sleep(1000); } catch (InterruptedException e) {}

一般的に、テストシナリオで単一のレコードを取得している場合でも、getRecords はループで呼び出す必要があります。getRecords への単一のコールは、後続のシーケンス番号ではシャード内に複数のレコードがあるという場合でも、空のレコードリストを返す可能性があります。この状況が発生すると、空のレコードリストとともに返された NextShardIterator が、シャード内の後続のシーケンス番号を参照し、最終的には連続する getRecords コールがレコードを返します。次のサンプルは、ループの使用を表すものです。

例: getRecords

以下のコード例には、このセクションで説明した getRecords のヒント (ループでの呼び出しなど) が反映されています。

// Continuously read data records from a shard List<Record> records; while (true) { // Create a new getRecordsRequest with an existing shardIterator // Set the maximum records to return to 25 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult result = client.getRecords(getRecordsRequest); // Put the result into record list. The result can be empty. records = result.getRecords(); try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); }

Kinesis Client Library を使用している場合は、データを返す前に複数回呼び出しが行われる場合があります。この動作は設計上のものであり、 KCLまたはデータに問題があることを示すものではありません。

リシャードに適応する

getRecordsResult.getNextShardIteratornull を返す場合、このシャードに関係するシャード分割またはマージが発生したことを示します。このシャードは現在 CLOSED 状態であり、このシャードから使用可能なすべてのデータレコードを読み込んでいます。

このシナリオでは、getRecordsResult.childShards を使用して、分割またはマージによって作成された、処理中のシャードの新しい子シャードについて学習することができます。詳細については、「」を参照してくださいChildShard

分割の場合は、2 つの新しいシャードの両方に、以前処理していたシャードのシャード ID に等しい parentShardId があります。adjacentParentShardId の値は、これらのシャード両方で null になります。

マージの場合は、マージによって作成された単一の新しいシャードに、一方の親シャードの ID に等しい parentShardId と、もう一方の親シャードのシャード ID に等しい adjacentParentShardId があります。アプリケーションはこれらのいずれかのシャードからすべてのデータを読み取り済みです。これは getRecordsResult.getNextShardIterator から null が返されたシャードです。アプリケーションでデータの順序が重要である場合、結合によって作成された子シャードから新しいデータを読み取る前に、その他の親シャードからもすべてのデータを読み取るようにする必要があります。

複数のプロセッサを使用してストリームからデータを取得し (たとえば、シャードごとに 1 つのプロセッサ)、シャードの分割または結合を行う場合、プロセッサの数を増減して、シャードの数の変化に適応させます。

シャードの状態 (CLOSED など) の説明を含むリシャーディングの詳細については、ストリームのリシャードを参照してください。