Kembangkan konsumen fan-out yang disempurnakan dengan Kinesis Data Streams API - Amazon Kinesis Data Streams

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Kembangkan konsumen fan-out yang disempurnakan dengan Kinesis Data Streams API

Fan-out yang disempurnakan adalah fitur Amazon Kinesis Data Streams yang memungkinkan konsumen menerima catatan dari aliran data dengan throughput khusus hingga 2 MB data per detik per pecahan. Konsumen yang menggunakan fan-out yang ditingkatkan tidak harus bersaing dengan konsumen lain yang menerima data dari streaming. Untuk informasi selengkapnya, lihat Kembangkan konsumen khusus dengan throughput khusus (fan-out yang ditingkatkan).

Anda dapat menggunakan API operasi untuk membangun konsumen yang menggunakan fan-out yang disempurnakan di Kinesis Data Streams.

Untuk mendaftarkan konsumen dengan fan-out yang disempurnakan menggunakan Kinesis Data Streams API
  1. Hubungi RegisterStreamConsumeruntuk mendaftarkan aplikasi Anda sebagai konsumen yang menggunakan fan-out yang disempurnakan. Kinesis Data Streams menghasilkan Amazon Resource ARN Name () untuk konsumen dan mengembalikannya dalam respons.

  2. Untuk mulai mendengarkan pecahan tertentu, berikan panggilan ke SubscribeToShardkonsumenARN. Kinesis Data Streams kemudian mulai mendorong catatan dari pecahan itu kepada Anda, dalam bentuk peristiwa bertipe HTTP melalui koneksi SubscribeToShardEvent/2. Koneksi tetap terbuka hingga 5 menit. Panggil SubscribeToShardlagi jika Anda ingin terus menerima catatan dari pecahan setelah future yang dikembalikan oleh panggilan untuk SubscribeToShardmenyelesaikan secara normal atau luar biasa.

    catatan

    SubscribeToShardAPIjuga mengembalikan daftar pecahan anak dari pecahan saat ini ketika akhir pecahan saat ini tercapai.

  3. Untuk membatalkan pendaftaran konsumen yang menggunakan fan-out yang ditingkatkan, hubungi. DeregisterStreamConsumer

Kode berikut adalah contoh bagaimana Anda dapat berlangganan konsumen Anda ke pecahan, memperbarui langganan secara berkala, dan menangani acara.

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }

Jika event.ContinuationSequenceNumber kembalinull, ini menunjukkan bahwa pecahan pecahan atau penggabungan telah terjadi yang melibatkan pecahan ini. Pecahan ini sekarang dalam CLOSED keadaan, dan Anda telah membaca semua catatan data yang tersedia dari pecahan ini. Dalam skenario ini, per contoh di atas, Anda dapat menggunakan event.childShards untuk mempelajari tentang pecahan anak baru dari pecahan yang sedang diproses yang dibuat oleh split atau merge. Untuk informasi lebih lanjut, lihat ChildShard.