

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Mengembangkan konsumen fan-out yang ditingkatkan dengan AWS SDK untuk Java
<a name="building-enhanced-consumers-api"></a>

*Penggemar yang disempurnakan* adalah fitur Amazon Kinesis Data Streams yang memungkinkan konsumen menerima catatan dari aliran data dengan throughput khusus hingga 2 MB data per detik per pecahan. Konsumen yang menggunakan fan-out yang ditingkatkan tidak harus bersaing dengan konsumen lain yang menerima data dari streaming. Untuk informasi selengkapnya, lihat [Kembangkan konsumen fan-out yang ditingkatkan dengan throughput khusus](enhanced-consumers.md).

Anda dapat menggunakan operasi API untuk membangun konsumen yang menggunakan fan-out yang disempurnakan di Kinesis Data Streams.

**Untuk mendaftarkan konsumen dengan fan-out yang disempurnakan menggunakan Kinesis Data Streams API**

1. Hubungi [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)untuk mendaftarkan aplikasi Anda sebagai konsumen yang menggunakan fan-out yang disempurnakan. Kinesis Data Streams menghasilkan Nama Sumber Daya Amazon (ARN) untuk konsumen dan mengembalikannya sebagai respons.

1. Untuk mulai mendengarkan pecahan tertentu, berikan ARN konsumen dalam panggilan ke. [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) Kinesis Data Streams kemudian mulai mendorong catatan dari pecahan itu kepada Anda, dalam bentuk peristiwa bertipe melalui koneksi [SubscribeToShardEvent](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html)HTTP/2. Koneksi tetap terbuka hingga 5 menit. Panggil [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)lagi jika Anda ingin terus menerima catatan dari pecahan setelah `future` yang dikembalikan oleh panggilan untuk [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)menyelesaikan secara normal atau luar biasa.
**catatan**  
`SubscribeToShard`API juga mengembalikan daftar pecahan anak dari pecahan saat ini saat akhir pecahan saat ini tercapai. 

1. Untuk membatalkan pendaftaran konsumen yang menggunakan fan-out yang ditingkatkan, hubungi. [DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)

Kode berikut adalah contoh bagaimana Anda dapat berlangganan konsumen Anda ke pecahan, memperbarui langganan secara berkala, dan menangani acara.

```
    import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
    import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
     
    import java.util.concurrent.CompletableFuture;
     
    /**
     * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java
     * for complete code and more examples.
     */
    public class SubscribeToShardSimpleImpl {
     
        private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737";
        private static final String SHARD_ID = "shardId-000000000000";
     
        public static void main(String[] args) {
     
            KinesisAsyncClient client = KinesisAsyncClient.create();
     
            SubscribeToShardRequest request = SubscribeToShardRequest.builder()
                    .consumerARN(CONSUMER_ARN)
                    .shardId(SHARD_ID)
                    .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build();
     
            // Call SubscribeToShard iteratively to renew the subscription periodically.
            while(true) {
                // Wait for the CompletableFuture to complete normally or exceptionally.
                callSubscribeToShardWithVisitor(client, request).join();
            }
     
            // Close the connection before exiting.
            // client.close();
        }
     
     
        /**
         * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface.
         */
        private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) {
            SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() {
                @Override
                public void visit(SubscribeToShardEvent event) {
                    System.out.println("Received subscribe to shard event " + event);
                }
            };
            SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
                    .builder()
                    .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
                    .subscriber(visitor)
                    .build();
            return client.subscribeToShard(request, responseHandler);
        }
    }
```

 Jika `event.ContinuationSequenceNumber` kembali`null`, ini menunjukkan bahwa pecahan pecahan atau penggabungan telah terjadi yang melibatkan pecahan ini. Pecahan ini sekarang dalam `CLOSED` keadaan, dan Anda telah membaca semua catatan data yang tersedia dari pecahan ini. Dalam skenario ini, per contoh di atas, Anda dapat menggunakan `event.childShards` untuk mempelajari tentang pecahan anak baru dari pecahan yang sedang diproses yang dibuat oleh split atau merge. Untuk informasi selengkapnya, lihat [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).