

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 通过以下方式培养更具吸引力的粉丝消费者 适用于 Java 的 AWS SDK
<a name="building-enhanced-consumers-api"></a>

*增强型扇出*是一种 Amazon Kinesis Data Streams 功能，支持消费端从数据流中接收记录，其中每分片每秒专用吞吐量高达 2MB 数据。使用增强型扇出功能的消费端不必与接收流中数据的其他消费端争夺。有关更多信息，请参阅 [开发具有专用吞吐量的增强扇出型消费端](enhanced-consumers.md)。

可以使用 API 操作构建在 Kinesis Data Streams 中使用增强型扇出功能的消费端。

**使用 Kinesis Data Streams API 注册采用增强型扇出功能的消费端**

1. 致电[RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)将您的应用程序注册为使用增强型扇出功能的使用者。Kinesis Data Streams 为消费端生成一个 Amazon 资源名称（ARN）并在响应中返回此名称。

1. 要开始监听特定分片，请在调用中将使用者 ARN 传递给。[SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)然后，Kinesis Data Streams 开始通过 HTTP/2 连接以[SubscribeToShardEvent](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html)类型的事件形式将记录从该分片推送给你。此连接将保持打开状态长达 5 分钟。如果要在调用返回的记录正常或异常[SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)完成后继续从分片接收记录，请[SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)再次调用。`future`
**注意**  
到达当前分片的末尾时，`SubscribeToShard` API 还会返回当前分片的子分片列表。

1. 要取消注册使用增强型扇出功能的消费者，请致电。[DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)

以下代码是一个示例，演示如何为消费端订阅分片、定期续订订阅以及处理事件。

```
    import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
    import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
     
    import java.util.concurrent.CompletableFuture;
     
    /**
     * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java
     * for complete code and more examples.
     */
    public class SubscribeToShardSimpleImpl {
     
        private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737";
        private static final String SHARD_ID = "shardId-000000000000";
     
        public static void main(String[] args) {
     
            KinesisAsyncClient client = KinesisAsyncClient.create();
     
            SubscribeToShardRequest request = SubscribeToShardRequest.builder()
                    .consumerARN(CONSUMER_ARN)
                    .shardId(SHARD_ID)
                    .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build();
     
            // Call SubscribeToShard iteratively to renew the subscription periodically.
            while(true) {
                // Wait for the CompletableFuture to complete normally or exceptionally.
                callSubscribeToShardWithVisitor(client, request).join();
            }
     
            // Close the connection before exiting.
            // client.close();
        }
     
     
        /**
         * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface.
         */
        private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) {
            SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() {
                @Override
                public void visit(SubscribeToShardEvent event) {
                    System.out.println("Received subscribe to shard event " + event);
                }
            };
            SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
                    .builder()
                    .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
                    .subscriber(visitor)
                    .build();
            return client.subscribeToShard(request, responseHandler);
        }
    }
```

 如果 `event.ContinuationSequenceNumber` 返回 `null`，则表示发生了涉及此分片的分片拆分或合并。此分片现在处于 `CLOSED` 状态，并且您已从其中读取了所有可用的数据记录。在这种情况下，按照上文示例所述，您可以使用 `event.childShards` 来了解正在处理的分片中由拆分或合并创建的新子分片。有关更多信息，请参阅 [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html)。