

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# で拡張ファンアウトコンシューマーを開発する AWS SDK for Java
<a name="building-enhanced-consumers-api"></a>

*拡張ファンアウト*は Amazon Kinesis Data Streams の機能です。この機能を使用すると、コンシューマーは、シャードあたり 1 秒間に最大 2 MB のデータの専用スループットで、データストリームからレコードを受け取ることができます。拡張ファンアウトを使用するコンシューマーは、ストリームからデータを受け取っている他のコンシューマーと競合する必要はありません。詳細については、[専用スループットを備えた拡張ファンアウトを開発する](enhanced-consumers.md)を参照してください。

拡張ファンアウトを inesis Data Streams で使用するコンシューマーを構築するには、API オペレーションを使用します。

**Kinesis Data Streams API を使用して拡張ファンアウトでコンシューマーを登録するには**

1. 拡張ファンアウトを使用するコンシューマーとしてアプリケーションを登録するには、[RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html) を呼び出します。Kinesis Data Streams は、コンシューマーの Amazon リソースネーム (ARN) を生成し、レスポンスで返します。

1. 特定のシャードのリッスンを開始するには、[SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) への呼び出しでコンシューマー ARN を渡します。シャードのレコードは Kinesis Data Streams によって、[SubscribeToShardEvent](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html) イベントの形式で HTTP/2 接続経由で送信されます。接続は最大 5 分間開いたままです。[SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) への呼び出しによって返される `future` が正常または例外的に完了した後も、引き続きシャードからレコードを受け取る場合は、[SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) を再度呼び出します。
**注記**  
`SubscribeToShard` API は、現在のシャードの終わりに達すると、現在のシャードの子シャードのリストも返します。

1. 拡張ファンアウトを使用しているコンシューマーの登録を解除するには、[DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html) を呼び出します。

次のコードは、シャードへのコンシューマーのサブスクライブ、サブスクリプションの定期更新、イベントの処理を行う方法の例です。

```
    import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
    import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
     
    import java.util.concurrent.CompletableFuture;
     
    /**
     * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java
     * for complete code and more examples.
     */
    public class SubscribeToShardSimpleImpl {
     
        private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737";
        private static final String SHARD_ID = "shardId-000000000000";
     
        public static void main(String[] args) {
     
            KinesisAsyncClient client = KinesisAsyncClient.create();
     
            SubscribeToShardRequest request = SubscribeToShardRequest.builder()
                    .consumerARN(CONSUMER_ARN)
                    .shardId(SHARD_ID)
                    .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build();
     
            // Call SubscribeToShard iteratively to renew the subscription periodically.
            while(true) {
                // Wait for the CompletableFuture to complete normally or exceptionally.
                callSubscribeToShardWithVisitor(client, request).join();
            }
     
            // Close the connection before exiting.
            // client.close();
        }
     
     
        /**
         * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface.
         */
        private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) {
            SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() {
                @Override
                public void visit(SubscribeToShardEvent event) {
                    System.out.println("Received subscribe to shard event " + event);
                }
            };
            SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
                    .builder()
                    .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
                    .subscriber(visitor)
                    .build();
            return client.subscribeToShard(request, responseHandler);
        }
    }
```

 `event.ContinuationSequenceNumber` が `null` を返す場合、このシャードに関係するシャード分割またはマージが発生したことを示します。このシャードは現在 `CLOSED` 状態であり、このシャードから使用可能なすべてのデータレコードを読み込んでいます。このシナリオでは、上記の例のように、`event.childShards` を使用して、分割またはマージによって作成された、処理中のシャードの新しい子シャードについて学習することができます。詳細については、[ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html) を参照してください。