本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
通过以下方式培养更具吸引力的粉丝消费者 AWS SDK for Java
增强型扇出是一种 Amazon Kinesis Data Streams 功能,支持消费端从数据流中接收记录,其中每分片每秒专用吞吐量高达 2MB 数据。使用增强型扇出功能的消费端不必与接收流中数据的其他消费端争夺。有关更多信息,请参阅 开发具有专用吞吐量的增强型扇出消费者。
您可以使用API操作在 Kinesis Data Streams 中构建使用增强型扇出功能的使用者。
使用 Kinesis Data Streams 注册具有增强型扇出功能的消费者 API
-
致电RegisterStreamConsumer将您的应用程序注册为使用增强型扇出功能的使用者。Kinesis Data Streams 为使用者生成一个亚马逊资源名称 ARN (),并在响应中将其返回。
-
要开始收听特定分片,请在调用ARN中将使用者传递给。SubscribeToShard然后,Kinesis Data Streams 开始通过 HTTP /2 连接以SubscribeToShardEvent类型的事件形式将记录从该分片推送给你。此连接将保持打开状态长达 5 分钟。如果要在调用返回的记录正常或异常SubscribeToShard完成后继续从分片接收记录,请SubscribeToShard再次调用。
future
注意
SubscribeToShard
API当到达当前分片的末尾时,还会返回当前分片的子分片列表。 -
要取消注册正在使用增强型扇出功能的消费者,请致电。DeregisterStreamConsumer
以下代码是一个示例,演示如何为消费端订阅分片、定期续订订阅以及处理事件。
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }
如果 event.ContinuationSequenceNumber
返回 null
,则表示发生了涉及此分片的分片拆分或合并。此分片现在处于 CLOSED
状态,并且您已从其中读取了所有可用的数据记录。在这种情况下,按照上文示例所述,您可以使用 event.childShards
来了解正在处理的分片中由拆分或合并创建的新子分片。有关更多信息,请参阅ChildShard。