使用 Kinesis Data Streams API 开发具有增强扇出功能的消费端 - Amazon Kinesis Data Streams

使用 Kinesis Data Streams API 开发具有增强扇出功能的消费端

增强型扇出是一种 Amazon Kinesis Data Streams 功能,支持消费端从数据流中接收记录,其中每分片每秒专用吞吐量高达 2MB 数据。使用增强型扇出功能的消费端不必与接收流中数据的其他消费端争夺。有关更多信息,请参阅 开发具有专用吞吐量的自定义消费端(增强扇出型)

可以使用 API 操作构建在 Kinesis Data Streams 中使用增强型扇出功能的消费端。

使用 Kinesis Data Streams API 注册采用增强型扇出功能的消费端
  1. 调用 RegisterStreamConsumer 将应用程序注册为使用增强型扇出功能的消费端。Kinesis Data Streams 为消费端生成一个 Amazon 资源名称(ARN)并在响应中返回此名称。

  2. 要开始侦听特定分片,请将调用中的消费端 ARN 传递给 SubscribeToShard。之后,Kinesis Data Streams 会通过 HTTP/2 连接将分片中的记录以 SubscribeToShardEvent 类型的事件形式推送给您。此连接将保持打开状态长达 5 分钟。如果要在通过调用 SubscribeToShard 返回的 future 正常或异常完成之后继续接收分片中的记录,请再次调用 SubscribeToShard

    注意

    到达当前分片的末尾时,SubscribeToShard API 还会返回当前分片的子分片列表。

  3. 要取消注册使用增强型扇出功能的消费端,请调用 DeregisterStreamConsumer

以下代码是一个示例,演示如何为消费端订阅分片、定期续订订阅以及处理事件。

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }

如果 event.ContinuationSequenceNumber 返回 null,则表示发生了涉及此分片的分片拆分或合并。此分片现在处于 CLOSED 状态,并且您已从其中读取了所有可用的数据记录。在这种情况下,按照上文示例所述,您可以使用 event.childShards 来了解正在处理的分片中由拆分或合并创建的新子分片。有关更多信息,请参阅 ChildShard