

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# を使用してコンシューマーを開発する AWS SDK for Java
<a name="develop-consumers-sdk"></a>

 Amazon Kinesis Data Streams API を使用してカスタムコンシューマーを開発できます。このセクションでは、 AWS SDK for Javaでの Kinesis Data Streams API の使用について説明します。

**重要**  
全体で共有されるカスタム Kinesis Data Streams コンシューマーを開発するには、Kinesis Client Library (KCL) を使用することをお勧めします。KCL は、分散コンピューティングに関連する複雑なタスクの多くを処理することで、Kinesis Data Streams からデータを消費および処理するのに役立ちます。詳細については、「[Java で KCL を使用してコンシューマーを開発する](develop-kcl-consumers-java.md)」を参照してください。

**Topics**
+ [を使用して共有スループットコンシューマーを開発する AWS SDK for Java](developing-consumers-with-sdk.md)
+ [で拡張ファンアウトコンシューマーを開発する AWS SDK for Java](building-enhanced-consumers-api.md)
+ [AWS Glue Schema Registry を使用してデータを操作する](building-enhanced-consumers-glue-schema-registry.md)

# を使用して共有スループットコンシューマーを開発する AWS SDK for Java
<a name="developing-consumers-with-sdk"></a>

共有スループットを利用するカスタム Kinesis Data Streams コンシューマーを開発する方法の 1 つは、 AWS SDK for Javaを使用して Amazon Kinesis Data Streams API を使用することです。このセクションでは、 AWS SDK for Javaでの Kinesis Data Streams API の使用について説明します。また、他のプログラミング言語を使用して Kinesis Data Streams API を呼び出すこともできます。利用可能なすべての AWS SDKs[「アマゾン ウェブ サービスの開発を開始する](https://aws.amazon.com/developers/getting-started/)」を参照してください。

このセクションで紹介する Java サンプルコードは、基本的な Kinesis Data Streams API オペレーションを実行する方法を示しており、オペレーションタイプ別に論理的に分割されています。これらのサンプルコードは、本稼働環境対応のコードではありません。考えられる例外のすべてを確認するものではなく、潜在的なセキュリティまたはパフォーマンス事項も考慮されていません。

**Topics**
+ [ストリームからのデータを取得する](#kinesis-using-sdk-java-get-data)
+ [シャードイテレーターを使用する](#kinesis-using-sdk-java-get-data-shard-iterators)
+ [GetRecords を使用する](#kinesis-using-sdk-java-get-data-getrecords)
+ [リシャードに適応する](#kinesis-using-sdk-java-get-data-reshard)

## ストリームからのデータを取得する
<a name="kinesis-using-sdk-java-get-data"></a>

Kinesis Data Streams API には、データストリームからレコードを取得するために呼び出すことができる `getShardIterator` および `getRecords` メソッドが含まれています。これはプルモデルで、コードはデータストリームのシャードからデータを直接取得します。

**重要**  
KCL によって提供されているレコードプロセッサのサポートを使用して、データストリームからレコードを取得することをお勧めします。これは、データを処理するコードを組み込むプッシュモデルです。KCL は、ストリームからデータレコードを取り出し、アプリケーションコードに配信します。さらに、CL には、フェイルオーバー、リカバリ、負荷分散の機能が用意されています。詳細については、[KCL を使用したスループット共有カスタムコンシューマーの開発](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html)を参照してください。

ただし、状況によっては Kinesis Data Streams API を使用した方がよい場合があります。例えば、データストリームのモニタリングやデバッグのためのカスタムツールを実装する場合です。

**重要**  
Kinesis Data Streams は、データストリームのデータレコードの保持期間の変更をサポートしています。詳細については、「[データ保持期間を変更する](kinesis-extended-retention.md)」を参照してください。

## シャードイテレーターを使用する
<a name="kinesis-using-sdk-java-get-data-shard-iterators"></a>

レコードは、シャード単位でストリームから取得します。シャードごと、およびそのシャードから取得するレコードのバッチごとに、シャードイテレーターを取得する必要があります。シャードイテレーターは、レコードの取得元になるシャードを指定するために `getRecordsRequest` オブジェクトで使用します。シャードイテレーターに関連付けられるタイプによって、レコードを取得するシャード内の位置が決まります (詳細については、このセクションの後半を参照してください)。シャードイテレーターを使用する前にシャードを取得する必要があります。詳細については、「[シャードを一覧表示する](kinesis-using-sdk-java-list-shards.md)」を参照してください。

最初のシャードイテレーターは、`getShardIterator` メソッドを使用して取得します。追加のレコードバッチのシャードイテレーターは、`getNextShardIterator` メソッドによって返された `getRecordsResult` オブジェクトの `getRecords` メソッドを使用して取得します。シャードイテレーターの有効時間は 5 分間です。有効時間内にシャードイテレーターを使用すると、新しいシャードイテレーターが取得されます。各シャードイテレーターは、使用後も 5 分間有効です。

最初のシャードイテレーターを取得するには、`GetShardIteratorRequest` をインスタンス化してから、`getShardIterator` メソッドに渡します。リクエストを設定するには、ストリームとシャード ID を指定します。 AWS アカウントでストリームを取得する方法については、「」を参照してください[ストリームの一覧表示](kinesis-using-sdk-java-list-streams.md)。ストリーム内のシャードを取得する方法については、[シャードを一覧表示する](kinesis-using-sdk-java-list-shards.md)を参照してください。

```
String shardIterator;
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName(myStreamName);
getShardIteratorRequest.setShardId(shard.getShardId());
getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");

GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest);
shardIterator = getShardIteratorResult.getShardIterator();
```

このサンプルコードは、最初のシャードイテレーターを取得するときのイテレータータイプとして `TRIM_HORIZON` を指定しています。このイテレーター型を指定することで、レコードはまず、シャードに直近に追加されたレコード (*tip*) からではなく、シャードに最初に追加されたレコードから返されます。以下は、使用可能なイテレータータイプです。
+ `AT_SEQUENCE_NUMBER`
+ `AFTER_SEQUENCE_NUMBER`
+ `AT_TIMESTAMP`
+ `TRIM_HORIZON`
+ `LATEST`

詳細については、[ShardIteratorType](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType)を参照してください。

イテレータータイプによっては、タイプに加えてシーケンス番号を指定する必要があります。以下がその例です。

```
getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER");
getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);
```

`getRecords` を使用してレコードを取得したら、レコードの `getSequenceNumber` メソッドを呼び出すことによって、レコードのシーケンス番号を取得できます。

```
record.getSequenceNumber()
```

さらに、データストリームにレコードを追加するコードは、`getSequenceNumber` の結果に対して `putRecord` を呼び出すことで、追加されたレコードのシーケンス番号を取得できます。

```
lastSequenceNumber = putRecordResult.getSequenceNumber();
```

シーケンス番号は、レコードの順序が厳密に増加することを保証するために使用できます。詳細については、[PutRecord の例](developing-producers-with-sdk.md#kinesis-using-sdk-java-putrecord-example) のサンプルコードを参照してください。

## GetRecords を使用する
<a name="kinesis-using-sdk-java-get-data-getrecords"></a>

シャードイテレーターを取得したら、`GetRecordsRequest` オブジェクトをインスタンス化します。リクエストのイテレーターは、`setShardIterator` メソッドを使用して指定します。

オプションとして、`setLimit` メソッドを使用することで、取得するレコードの数を設定することもできます。`getRecords` が返すレコードの数は、常にこの制限以下になります。この制限を指定しない場合、`getRecords` は取得したレコードの 10 MB を返します。次のサンプルコードは、この制限を 25 レコードに設定します。

レコードが返されない場合、シャードイテレーターが参照するシーケンス番号には、このシャードから現在利用できるデータレコードが存在しないことを意味します。この状況では、アプリケーションが、ストリームのデータソースに対して適切な時間を待機する必要があります。その後、`getRecords` への以前のコールで返されたシャードイテレーターを使用して、シャードからのデータの取得を再試行します。

`getRecordsRequest` メソッドに `getRecords` を渡し、返された値を `getRecordsResult` オブジェクトとしてキャプチャします。データレコードを取得するには、`getRecords` オブジェクトに対して `getRecordsResult` メソッドを呼び出します。

```
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(25);

GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest);
List<Record> records = getRecordsResult.getRecords();
```

`getRecords` への別のコールを準備するために、`getRecordsResult` から次のシャードイテレーターを取得します。

```
shardIterator = getRecordsResult.getNextShardIterator();
```

最良の結果を得るには、`getRecords` へのコール間で少なくとも 1 秒間 (1,000 ミリ秒) スリープして、`getRecords` の頻度制限を超えないようにしてください。

```
try {
  Thread.sleep(1000);
}
catch (InterruptedException e) {}
```

一般的に、テストシナリオで単一のレコードを取得している場合でも、`getRecords` はループで呼び出す必要があります。`getRecords` への単一のコールは、後続のシーケンス番号ではシャード内に複数のレコードがあるという場合でも、空のレコードリストを返す可能性があります。この状況が発生すると、空のレコードリストとともに返された `NextShardIterator` が、シャード内の後続のシーケンス番号を参照し、最終的には連続する `getRecords` コールがレコードを返します。次のサンプルは、ループの使用を表すものです。

**例: getRecords**  
以下のコード例には、このセクションで説明した `getRecords` のヒント (ループでの呼び出しなど) が反映されています。

```
// Continuously read data records from a shard
List<Record> records;
    
while (true) {
   
  // Create a new getRecordsRequest with an existing shardIterator 
  // Set the maximum records to return to 25
  
  GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
  getRecordsRequest.setShardIterator(shardIterator);
  getRecordsRequest.setLimit(25); 

  GetRecordsResult result = client.getRecords(getRecordsRequest);
  
  // Put the result into record list. The result can be empty.
  records = result.getRecords();
  
  try {
    Thread.sleep(1000);
  } 
  catch (InterruptedException exception) {
    throw new RuntimeException(exception);
  }
  
  shardIterator = result.getNextShardIterator();
}
```

Kinesis Client Library を使用している場合は、データを返す前に複数回呼び出しが行われる場合があります。この動作は仕様であり、KCL やデータの問題を示すものではありません。

## リシャードに適応する
<a name="kinesis-using-sdk-java-get-data-reshard"></a>

 `getRecordsResult.getNextShardIterator` が `null` を返す場合、このシャードに関係するシャード分割またはマージが発生したことを示します。このシャードは現在 `CLOSED` 状態であり、このシャードから使用可能なすべてのデータレコードを読み込んでいます。

 このシナリオでは、`getRecordsResult.childShards` を使用して、分割またはマージによって作成された、処理中のシャードの新しい子シャードについて学習することができます。詳細については、[ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html)を参照してください。

 分割の場合は、2 つの新しいシャードの両方に、以前処理していたシャードのシャード ID に等しい `parentShardId` があります。`adjacentParentShardId` の値は、これらのシャード両方で `null` になります。

 マージの場合は、マージによって作成された単一の新しいシャードに、一方の親シャードの ID に等しい `parentShardId` と、もう一方の親シャードのシャード ID に等しい `adjacentParentShardId` があります。アプリケーションはこれらのいずれかのシャードからすべてのデータを読み取り済みです。これは `getRecordsResult.getNextShardIterator` から `null` が返されたシャードです。アプリケーションでデータの順序が重要である場合、結合によって作成された子シャードから新しいデータを読み取る前に、その他の親シャードからもすべてのデータを読み取るようにする必要があります。

 複数のプロセッサを使用してストリームからデータを取得し (たとえば、シャードごとに 1 つのプロセッサ)、シャードの分割または結合を行う場合、プロセッサの数を増減して、シャードの数の変化に適応させます。

 シャードの状態 (`CLOSED` など) の説明を含むリシャーディングの詳細については、[ストリームをリシャーディングする](kinesis-using-sdk-java-resharding.md)を参照してください。

# で拡張ファンアウトコンシューマーを開発する AWS SDK for Java
<a name="building-enhanced-consumers-api"></a>

*拡張ファンアウト*は Amazon Kinesis Data Streams の機能です。この機能を使用すると、コンシューマーは、シャードあたり 1 秒間に最大 2 MB のデータの専用スループットで、データストリームからレコードを受け取ることができます。拡張ファンアウトを使用するコンシューマーは、ストリームからデータを受け取っている他のコンシューマーと競合する必要はありません。詳細については、[専用スループットを備えた拡張ファンアウトを開発する](enhanced-consumers.md)を参照してください。

拡張ファンアウトを inesis Data Streams で使用するコンシューマーを構築するには、API オペレーションを使用します。

**Kinesis Data Streams API を使用して拡張ファンアウトでコンシューマーを登録するには**

1. 拡張ファンアウトを使用するコンシューマーとしてアプリケーションを登録するには、[RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html) を呼び出します。Kinesis Data Streams は、コンシューマーの Amazon リソースネーム (ARN) を生成し、レスポンスで返します。

1. 特定のシャードのリッスンを開始するには、[SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) への呼び出しでコンシューマー ARN を渡します。シャードのレコードは Kinesis Data Streams によって、[SubscribeToShardEvent](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html) イベントの形式で HTTP/2 接続経由で送信されます。接続は最大 5 分間開いたままです。[SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) への呼び出しによって返される `future` が正常または例外的に完了した後も、引き続きシャードからレコードを受け取る場合は、[SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) を再度呼び出します。
**注記**  
`SubscribeToShard` API は、現在のシャードの終わりに達すると、現在のシャードの子シャードのリストも返します。

1. 拡張ファンアウトを使用しているコンシューマーの登録を解除するには、[DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html) を呼び出します。

次のコードは、シャードへのコンシューマーのサブスクライブ、サブスクリプションの定期更新、イベントの処理を行う方法の例です。

```
    import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
    import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
     
    import java.util.concurrent.CompletableFuture;
     
    /**
     * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java
     * for complete code and more examples.
     */
    public class SubscribeToShardSimpleImpl {
     
        private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737";
        private static final String SHARD_ID = "shardId-000000000000";
     
        public static void main(String[] args) {
     
            KinesisAsyncClient client = KinesisAsyncClient.create();
     
            SubscribeToShardRequest request = SubscribeToShardRequest.builder()
                    .consumerARN(CONSUMER_ARN)
                    .shardId(SHARD_ID)
                    .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build();
     
            // Call SubscribeToShard iteratively to renew the subscription periodically.
            while(true) {
                // Wait for the CompletableFuture to complete normally or exceptionally.
                callSubscribeToShardWithVisitor(client, request).join();
            }
     
            // Close the connection before exiting.
            // client.close();
        }
     
     
        /**
         * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface.
         */
        private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) {
            SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() {
                @Override
                public void visit(SubscribeToShardEvent event) {
                    System.out.println("Received subscribe to shard event " + event);
                }
            };
            SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
                    .builder()
                    .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
                    .subscriber(visitor)
                    .build();
            return client.subscribeToShard(request, responseHandler);
        }
    }
```

 `event.ContinuationSequenceNumber` が `null` を返す場合、このシャードに関係するシャード分割またはマージが発生したことを示します。このシャードは現在 `CLOSED` 状態であり、このシャードから使用可能なすべてのデータレコードを読み込んでいます。このシナリオでは、上記の例のように、`event.childShards` を使用して、分割またはマージによって作成された、処理中のシャードの新しい子シャードについて学習することができます。詳細については、[ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html) を参照してください。

# AWS Glue Schema Registry を使用してデータを操作する
<a name="building-enhanced-consumers-glue-schema-registry"></a>

Kinesis データストリームを AWS Glue Schema Registry と統合できます。 AWS Glue スキーマレジストリを使用すると、スキーマを一元的に検出、制御、および進化させながら、生成されたデータが登録されたスキーマによって継続的に検証されるようにできます。スキーマ は、データレコードの構造と形式を定義します。スキーマは、信頼性の高いデータの公開、利用、または保存のための仕様をバージョニングしたものです。 AWS Glue スキーマレジストリを使用すると、ストリーミングアプリケーション内のエンドツーエンドのデータ品質とデータガバナンスを改善できます。詳細については、[AWS Glue スキーマレジストリ](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)を参照してください。この統合を設定する方法の 1 つは、Java SDK で利用可能な `GetRecords` Kinesis Data Streams API AWS を使用することです。

Kinesis Data Streams API を使用して Kinesis Data `GetRecords`Streams とスキーマレジストリの統合を設定する方法の詳細については、「ユースケース: Amazon Kinesis Data Streams と Glue スキーマレジストリの統合」の「Kinesis Data Streams APIs を使用したデータの操作」セクションを参照してください。 APIs [ Amazon Kinesis AWS](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds)