本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
增強型散發功能是 Amazon Kinesis Data Streams 的一項功能,使取用者從資料串流接收記錄時,專用輸送量可高達每個碎片每秒 2 MB 的資料。使用強化廣發功能的消費者不必與其他從串流接收資料的消費者競爭。如需詳細資訊,請參閱開發具有專用輸送量的增強型扇出消費者。
您可以使用 API 操作,為 Kinesis Data Streams 建置使用增強型散發功能的取用者。
使用 Kinesis Data Streams API 註冊具有增強型散發功能的取用者
-
呼叫 RegisterStreamConsumer 將您的應用程式註冊為使用增強型散發功能的取用者。Kinesis Data Streams 會為該取用者產生 Amazon Resource Name (ARN) 並隨回應傳回其值。
-
若要開始接聽特定碎片,請呼叫 SubscribeToShard 並傳遞取用者 ARN。Kinesis Data Streams 隨後會透過 HTTP/2 連線,開始從該碎片將記錄以 SubscribeToShardEvent 類型的事件形式推送給您。此連線將保持開啟長達 5 分鐘。若您希望於 SubscribeToShard 呼叫所傳回的
future
正常或異常完成後繼續從該碎片接收記錄,請再次呼叫 SubscribeToShard。注意
當到達當前碎片的末尾時,
SubscribeToShard
API 還返回當前碎片的子碎片清單。 -
若要將使用強化廣發功能的消費者取消註冊,請呼叫 DeregisterStreamConsumer。
以下範例程式碼示範如何為消費者訂閱碎片、定期續約訂閱和處理事件。
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
import java.util.concurrent.CompletableFuture;
/**
* See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java
* for complete code and more examples.
*/
public class SubscribeToShardSimpleImpl {
private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737";
private static final String SHARD_ID = "shardId-000000000000";
public static void main(String[] args) {
KinesisAsyncClient client = KinesisAsyncClient.create();
SubscribeToShardRequest request = SubscribeToShardRequest.builder()
.consumerARN(CONSUMER_ARN)
.shardId(SHARD_ID)
.startingPosition(s -> s.type(ShardIteratorType.LATEST)).build();
// Call SubscribeToShard iteratively to renew the subscription periodically.
while(true) {
// Wait for the CompletableFuture to complete normally or exceptionally.
callSubscribeToShardWithVisitor(client, request).join();
}
// Close the connection before exiting.
// client.close();
}
/**
* Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface.
*/
private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) {
SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() {
@Override
public void visit(SubscribeToShardEvent event) {
System.out.println("Received subscribe to shard event " + event);
}
};
SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
.builder()
.onError(t -> System.err.println("Error during stream - " + t.getMessage()))
.subscriber(visitor)
.build();
return client.subscribeToShard(request, responseHandler);
}
}
如果 event.ContinuationSequenceNumber
傳回 null
,則表示發生了涉及此碎片的碎片分割或合併。此碎片現在處於 CLOSED
狀態,並且您已從此碎片讀取所有可用的資料記錄。在這個案例中,根據上述範例,您可以使用 event.childShards
來了解分割或合併所建立之正在處理之碎片的新子碎片。如需詳細資訊,請參閱 ChildShard。