翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
を使用してスループットを共有してカスタムコンシューマーを開発する AWS SDK for Java
全体で共有されているカスタム Kinesis Data Streams コンシューマーを開発する方法の 1 つは、Amazon Kinesis Data Streams を使用することですAPIs。このセクションでは、 for Java APIsでの AWS SDK Kinesis Data Streams の使用について説明します。このセクションの Java サンプルコードは、基本的なKDSAPIオペレーションを実行する方法を示し、オペレーションタイプ別に論理的に分割されています。
これらのサンプルコードは、本稼働環境対応のコードではありません。考えられる例外のすべてを確認するものではなく、潜在的なセキュリティまたはパフォーマンス事項も考慮されていません。
他のプログラミング言語APIsを使用して Kinesis Data Streams を呼び出すことができます。利用可能なすべての の詳細については AWS SDKs、「Amazon Web Services での開発を開始する
重要
全体で共有されているカスタム Kinesis Data Streams コンシューマーを開発するための推奨方法は、Kinesis Client Library () を使用することですKCL。KCL は、分散コンピューティングに関連する複雑なタスクの多くを処理することで、Kinesis データストリームからのデータの消費と処理を支援します。詳細については、「 を使用した共有スループットのカスタムコンシューマーの開発KCL」を参照してください。
ストリームからデータを取得する
Kinesis Data Streams には、データストリームからレコードを取得するために呼び出すことができる メソッドgetShardIterator
と getRecords
メソッドAPIsが含まれています。これはプルモデルで、コードはデータストリームのシャードからデータを直接取得します。
重要
データストリームからレコードKCLを取得するには、 が提供するレコードプロセッサのサポートを使用することをお勧めします。これは、データを処理するコードを組み込むプッシュモデルです。は、データストリームからデータレコードKCLを取得し、アプリケーションコードに配信します。さらに、 はフェイルオーバー、リカバリ、ロードバランシング機能KCLを提供します。詳細については、「 を使用した共有スループットのカスタムコンシューマーの開発KCL」を参照してください。
ただし、場合によっては Kinesis Data Streams の使用が必要になることがありますAPIs。例えば、データストリームのモニタリングやデバッグのためのカスタムツールを実装する場合です。
重要
Kinesis Data Streams は、データストリームのデータレコードの保持期間の変更をサポートしています。詳細については、「データ保持期間を変更する」を参照してください。
シャードイテレーターを使用する
レコードは、シャード単位でストリームから取得します。シャードごと、およびそのシャードから取得するレコードのバッチごとに、シャードイテレーターを取得する必要があります。シャードイテレーターは、レコードの取得元になるシャードを指定するために getRecordsRequest
オブジェクトで使用します。シャードイテレーターに関連付けられるタイプによって、レコードを取得するシャード内の位置が決まります (詳細については、このセクションの後半を参照してください)。シャードイテレーターを使用する前に、シャードを取得する必要があります。詳細については、「シャードを一覧表示する」を参照してください。
最初のシャードイテレーターは、getShardIterator
メソッドを使用して取得します。追加のレコードバッチのシャードイテレーターは、getNextShardIterator
メソッドによって返された getRecordsResult
オブジェクトの getRecords
メソッドを使用して取得します。シャードイテレーターの有効時間は 5 分間です。有効時間内にシャードイテレーターを使用すると、新しいシャードイテレーターが取得されます。各シャードイテレーターは、使用後も 5 分間有効です。
最初のシャードイテレーターを取得するには、GetShardIteratorRequest
をインスタンス化してから、getShardIterator
メソッドに渡します。リクエストを設定するには、ストリームとシャード ID を指定します。 AWS アカウントでストリームを取得する方法については、「」を参照してくださいストリームの一覧表示。ストリーム内のシャードを取得する方法については、シャードを一覧表示するを参照してください。
String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(myStreamName); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator();
このサンプルコードは、最初のシャードイテレーターを取得するときのイテレータータイプとして TRIM_HORIZON
を指定しています。このイテレーター型を指定することで、レコードはまず、シャードに直近に追加されたレコード (tip) からではなく、シャードに最初に追加されたレコードから返されます。以下は、使用可能なイテレータータイプです。
-
AT_SEQUENCE_NUMBER
-
AFTER_SEQUENCE_NUMBER
-
AT_TIMESTAMP
-
TRIM_HORIZON
-
LATEST
詳細については、「」を参照してくださいShardIteratorType。
イテレータータイプによっては、タイプに加えてシーケンス番号を指定する必要があります。以下がその例です。
getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);
getRecords
を使用してレコードを取得したら、レコードの getSequenceNumber
メソッドを呼び出すことによって、レコードのシーケンス番号を取得できます。
record.getSequenceNumber()
さらに、データストリームにレコードを追加するコードは、getSequenceNumber
の結果に対して putRecord
を呼び出すことで、追加されたレコードのシーケンス番号を取得できます。
lastSequenceNumber = putRecordResult.getSequenceNumber();
シーケンス番号は、レコードの順序が厳密に増加することを保証するために使用できます。詳細については、PutRecord 例のサンプルコードを参照してください。
を使用する GetRecords
シャードイテレーターを取得したら、GetRecordsRequest
オブジェクトをインスタンス化します。リクエストのイテレーターは、setShardIterator
メソッドを使用して指定します。
オプションとして、setLimit
メソッドを使用することで、取得するレコードの数を設定することもできます。getRecords
が返すレコードの数は、常にこの制限以下になります。この制限を指定しない場合、getRecords
は取得したレコードの 10 MB を返します。次のサンプルコードは、この制限を 25 レコードに設定します。
レコードが返されない場合、シャードイテレーターが参照するシーケンス番号には、このシャードから現在利用できるデータレコードが存在しないことを意味します。この状況では、アプリケーションが、ストリームのデータソースに対して適切な時間を待機する必要があります。その後、getRecords
への以前のコールで返されたシャードイテレーターを使用して、シャードからのデータの取得を再試行します。
getRecordsRequest
メソッドに getRecords
を渡し、返された値を getRecordsResult
オブジェクトとしてキャプチャします。データレコードを取得するには、getRecords
オブジェクトに対して getRecordsResult
メソッドを呼び出します。
GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords();
getRecords
への別のコールを準備するために、getRecordsResult
から次のシャードイテレーターを取得します。
shardIterator = getRecordsResult.getNextShardIterator();
最良の結果を得るには、getRecords
へのコール間で少なくとも 1 秒間 (1,000 ミリ秒) スリープして、getRecords
の頻度制限を超えないようにしてください。
try { Thread.sleep(1000); } catch (InterruptedException e) {}
一般的に、テストシナリオで単一のレコードを取得している場合でも、getRecords
はループで呼び出す必要があります。getRecords
への単一のコールは、後続のシーケンス番号ではシャード内に複数のレコードがあるという場合でも、空のレコードリストを返す可能性があります。この状況が発生すると、空のレコードリストとともに返された NextShardIterator
が、シャード内の後続のシーケンス番号を参照し、最終的には連続する getRecords
コールがレコードを返します。次のサンプルは、ループの使用を表すものです。
例: getRecords
以下のコード例には、このセクションで説明した getRecords
のヒント (ループでの呼び出しなど) が反映されています。
// Continuously read data records from a shard List<Record> records; while (true) { // Create a new getRecordsRequest with an existing shardIterator // Set the maximum records to return to 25 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult result = client.getRecords(getRecordsRequest); // Put the result into record list. The result can be empty. records = result.getRecords(); try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); }
Kinesis Client Library を使用している場合は、データを返す前に複数回呼び出しが行われる場合があります。この動作は設計上のものであり、 KCLまたはデータに問題があることを示すものではありません。
リシャードに適応する
getRecordsResult.getNextShardIterator
が null
を返す場合、このシャードに関係するシャード分割またはマージが発生したことを示します。このシャードは現在 CLOSED
状態であり、このシャードから使用可能なすべてのデータレコードを読み込んでいます。
このシナリオでは、getRecordsResult.childShards
を使用して、分割またはマージによって作成された、処理中のシャードの新しい子シャードについて学習することができます。詳細については、「」を参照してくださいChildShard。
分割の場合は、2 つの新しいシャードの両方に、以前処理していたシャードのシャード ID に等しい parentShardId
があります。adjacentParentShardId
の値は、これらのシャード両方で null
になります。
マージの場合は、マージによって作成された単一の新しいシャードに、一方の親シャードの ID に等しい parentShardId
と、もう一方の親シャードのシャード ID に等しい adjacentParentShardId
があります。アプリケーションはこれらのいずれかのシャードからすべてのデータを読み取り済みです。これは getRecordsResult.getNextShardIterator
から null
が返されたシャードです。アプリケーションでデータの順序が重要である場合、結合によって作成された子シャードから新しいデータを読み取る前に、その他の親シャードからもすべてのデータを読み取るようにする必要があります。
複数のプロセッサを使用してストリームからデータを取得し (たとえば、シャードごとに 1 つのプロセッサ)、シャードの分割または結合を行う場合、プロセッサの数を増減して、シャードの数の変化に適応させます。
シャードの状態 (CLOSED
など) の説明を含むリシャーディングの詳細については、ストリームのリシャードを参照してください。