

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Mengembangkan konsumen dengan AWS SDK untuk Java
<a name="develop-consumers-sdk"></a>

 Anda dapat mengembangkan konsumen khusus menggunakan Amazon Kinesis APIs Data Streams. Bagian ini menjelaskan penggunaan Kinesis APIs Data AWS SDK untuk Java Streams dengan file.

**penting**  
Metode yang direkomendasikan untuk mengembangkan Kinesis Data Streams khusus yang dibagikan oleh konsumen adalah dengan menggunakan Kinesis Client Library (KCL). KCL membantu Anda mengkonsumsi dan memproses data dari aliran data Kinesis dengan menangani banyak tugas kompleks yang terkait dengan komputasi terdistribusi. Untuk informasi selengkapnya, lihat [Kembangkan konsumen dengan KCL di Jawa](develop-kcl-consumers-java.md).

**Topics**
+ [Mengembangkan konsumen throughput bersama dengan AWS SDK untuk Java](developing-consumers-with-sdk.md)
+ [Mengembangkan konsumen fan-out yang ditingkatkan dengan AWS SDK untuk Java](building-enhanced-consumers-api.md)
+ [Berinteraksi dengan data menggunakan AWS Glue Schema Registry](building-enhanced-consumers-glue-schema-registry.md)

# Mengembangkan konsumen throughput bersama dengan AWS SDK untuk Java
<a name="developing-consumers-with-sdk"></a>

Salah satu metode untuk mengembangkan Kinesis Data Streams kustom yang dibagikan oleh konsumen adalah dengan menggunakan Amazon APIs Kinesis Data Streams dengan file. AWS SDK untuk Java Bagian ini menjelaskan penggunaan Kinesis APIs Data AWS SDK untuk Java Streams dengan file. Anda dapat memanggil Kinesis APIs Data Streams menggunakan bahasa pemrograman lain yang berbeda. Untuk informasi selengkapnya tentang semua yang tersedia AWS SDKs, lihat [Mulai Mengembangkan dengan Amazon Web Services](https://aws.amazon.com/developers/getting-started/). 

Contoh kode Java di bagian ini menunjukkan cara melakukan operasi API Kinesis Data Streams dasar, dan dibagi secara logis berdasarkan jenis operasi. Contoh-contoh ini tidak mewakili kode siap produksi. Mereka tidak memeriksa semua kemungkinan pengecualian atau memperhitungkan semua kemungkinan pertimbangan keamanan atau kinerja. 

**Topics**
+ [Dapatkan data dari aliran](#kinesis-using-sdk-java-get-data)
+ [Gunakan iterator shard](#kinesis-using-sdk-java-get-data-shard-iterators)
+ [Gunakan GetRecords](#kinesis-using-sdk-java-get-data-getrecords)
+ [Beradaptasi dengan reshard](#kinesis-using-sdk-java-get-data-reshard)

## Dapatkan data dari aliran
<a name="kinesis-using-sdk-java-get-data"></a>

Kinesis APIs Data Streams `getShardIterator` menyertakan dan metode `getRecords` yang dapat Anda panggil untuk mengambil catatan dari aliran data. Ini adalah model tarik, di mana kode Anda menarik catatan data langsung dari pecahan aliran data.

**penting**  
Kami menyarankan Anda menggunakan dukungan prosesor rekaman yang disediakan oleh KCL untuk mengambil catatan dari aliran data Anda. Ini adalah model push, di mana Anda menerapkan kode yang memproses data. KCL mengambil catatan data dari aliran data dan mengirimkannya ke kode aplikasi Anda. Selain itu, KCL menyediakan fungsionalitas failover, recovery, dan load balancing. Untuk informasi selengkapnya, lihat [Mengembangkan Konsumen Kustom dengan Throughput Bersama Menggunakan KCL](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html).

Namun, dalam beberapa kasus Anda mungkin lebih suka menggunakan Kinesis APIs Data Streams. Misalnya, untuk menerapkan alat khusus untuk memantau atau men-debug aliran data Anda.

**penting**  
Kinesis Data Streams mendukung perubahan pada periode retensi rekaman data aliran data Anda. Untuk informasi selengkapnya, lihat [Ubah periode retensi data](kinesis-extended-retention.md).

## Gunakan iterator shard
<a name="kinesis-using-sdk-java-get-data-shard-iterators"></a>

Anda mengambil catatan dari aliran pada basis per-shard. *Untuk setiap pecahan, dan untuk setiap batch catatan yang Anda ambil dari pecahan itu, Anda harus mendapatkan iterator shard.* Iterator shard digunakan dalam `getRecordsRequest` objek untuk menentukan pecahan dari mana catatan akan diambil. Jenis yang terkait dengan iterator shard menentukan titik dalam pecahan dari mana catatan harus diambil (lihat nanti di bagian ini untuk lebih jelasnya). Sebelum Anda dapat bekerja dengan iterator shard, Anda harus mengambil shard. Untuk informasi selengkapnya, lihat [Daftar pecahan](kinesis-using-sdk-java-list-shards.md).

Dapatkan iterator shard awal menggunakan metode ini`getShardIterator`. Dapatkan iterator pecahan untuk kumpulan catatan tambahan menggunakan `getNextShardIterator` metode `getRecordsResult` objek yang dikembalikan oleh metode. `getRecords` Sebuah iterator shard berlaku selama 5 menit. Jika Anda menggunakan iterator shard saat valid, Anda mendapatkan yang baru. Setiap iterator shard tetap valid selama 5 menit, bahkan setelah digunakan.

Untuk mendapatkan iterator shard awal, buat instance `GetShardIteratorRequest` dan teruskan ke metode. `getShardIterator` Untuk mengkonfigurasi permintaan, tentukan stream dan ID shard. Untuk informasi tentang cara mendapatkan aliran di AWS akun Anda, lihat[Daftar aliran](kinesis-using-sdk-java-list-streams.md). Untuk informasi tentang cara mendapatkan pecahan dalam aliran, lihat[Daftar pecahan](kinesis-using-sdk-java-list-shards.md).

```
String shardIterator;
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName(myStreamName);
getShardIteratorRequest.setShardId(shard.getShardId());
getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");

GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest);
shardIterator = getShardIteratorResult.getShardIterator();
```

Kode sampel ini menentukan `TRIM_HORIZON` sebagai tipe iterator ketika mendapatkan iterator shard awal. *Jenis iterator ini berarti bahwa catatan harus dikembalikan dimulai dengan catatan pertama ditambahkan ke shard—daripada dimulai dengan catatan yang paling baru ditambahkan, juga dikenal sebagai tip.* Berikut ini adalah jenis iterator yang mungkin:
+ `AT_SEQUENCE_NUMBER`
+ `AFTER_SEQUENCE_NUMBER`
+ `AT_TIMESTAMP`
+ `TRIM_HORIZON`
+ `LATEST`

Untuk informasi selengkapnya, lihat [ShardIteratorType](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType).

Beberapa tipe iterator mengharuskan Anda menentukan nomor urut selain jenisnya; misalnya:

```
getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER");
getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);
```

Setelah Anda mendapatkan catatan menggunakan`getRecords`, Anda bisa mendapatkan nomor urut untuk catatan dengan memanggil `getSequenceNumber` metode catatan. 

```
record.getSequenceNumber()
```

Selain itu, kode yang menambahkan catatan ke aliran data bisa mendapatkan nomor urut untuk catatan tambahan dengan memanggil `getSequenceNumber` hasil`putRecord`. 

```
lastSequenceNumber = putRecordResult.getSequenceNumber();
```

Anda dapat menggunakan nomor urut untuk menjamin peningkatan urutan catatan secara ketat. Untuk informasi selengkapnya, lihat contoh kode di[PutRecord contoh](developing-producers-with-sdk.md#kinesis-using-sdk-java-putrecord-example).

## Gunakan GetRecords
<a name="kinesis-using-sdk-java-get-data-getrecords"></a>

Setelah Anda mendapatkan iterator shard, buat instance objek. `GetRecordsRequest` Tentukan iterator untuk permintaan menggunakan `setShardIterator` metode. 

Secara opsional, Anda juga dapat mengatur jumlah catatan untuk diambil menggunakan metode ini`setLimit`. Jumlah catatan yang dikembalikan `getRecords` selalu sama dengan atau kurang dari batas ini. Jika Anda tidak menentukan batas ini, `getRecords` mengembalikan 10 MB catatan diambil. Kode contoh di bawah ini menetapkan batas ini menjadi 25 catatan.

Jika tidak ada catatan yang dikembalikan, itu berarti tidak ada catatan data saat ini tersedia dari pecahan ini pada nomor urut yang direferensikan oleh iterator pecahan. Dalam situasi ini, aplikasi Anda harus menunggu sejumlah waktu yang sesuai untuk sumber data untuk streaming. Kemudian cobalah untuk mendapatkan data dari pecahan lagi menggunakan iterator shard yang dikembalikan oleh panggilan sebelumnya ke. `getRecords` 

Lewati `getRecordsRequest` ke `getRecords` metode, dan tangkap nilai yang dikembalikan sebagai `getRecordsResult` objek. Untuk mendapatkan catatan data, panggil `getRecords` metode pada `getRecordsResult` objek. 

```
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(25);

GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest);
List<Record> records = getRecordsResult.getRecords();
```

Untuk mempersiapkan panggilan lain`getRecords`, dapatkan iterator shard berikutnya dari. `getRecordsResult` 

```
shardIterator = getRecordsResult.getNextShardIterator();
```

Untuk hasil terbaik, tidurlah setidaknya 1 detik (1.000 milidetik) di antara panggilan `getRecords` untuk menghindari melebihi batas frekuensi. `getRecords` 

```
try {
  Thread.sleep(1000);
}
catch (InterruptedException e) {}
```

Biasanya, Anda harus memanggil `getRecords` dalam satu lingkaran, bahkan ketika Anda mengambil satu catatan dalam skenario pengujian. Satu panggilan ke `getRecords` mungkin mengembalikan daftar catatan kosong, bahkan ketika pecahan berisi lebih banyak catatan di nomor urutan selanjutnya. Ketika ini terjadi, yang `NextShardIterator` dikembalikan bersama dengan daftar catatan kosong mereferensikan nomor urut selanjutnya dalam pecahan, dan `getRecords` panggilan berturut-turut akhirnya mengembalikan catatan. Sampel berikut menunjukkan penggunaan loop.

**Contoh: GetRecords**  
Contoh kode berikut mencerminkan `getRecords` tips di bagian ini, termasuk membuat panggilan dalam satu lingkaran.

```
// Continuously read data records from a shard
List<Record> records;
    
while (true) {
   
  // Create a new getRecordsRequest with an existing shardIterator 
  // Set the maximum records to return to 25
  
  GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
  getRecordsRequest.setShardIterator(shardIterator);
  getRecordsRequest.setLimit(25); 

  GetRecordsResult result = client.getRecords(getRecordsRequest);
  
  // Put the result into record list. The result can be empty.
  records = result.getRecords();
  
  try {
    Thread.sleep(1000);
  } 
  catch (InterruptedException exception) {
    throw new RuntimeException(exception);
  }
  
  shardIterator = result.getNextShardIterator();
}
```

Jika Anda menggunakan Perpustakaan Klien Kinesis, mungkin akan melakukan beberapa panggilan sebelum mengembalikan data. Perilaku ini dirancang dan tidak menunjukkan masalah dengan KCL atau data Anda.

## Beradaptasi dengan reshard
<a name="kinesis-using-sdk-java-get-data-reshard"></a>

 Jika `getRecordsResult.getNextShardIterator` kembali`null`, ini menunjukkan bahwa pecahan pecahan atau penggabungan telah terjadi yang melibatkan pecahan ini. Pecahan ini sekarang dalam `CLOSED` keadaan dan Anda telah membaca semua catatan data yang tersedia dari pecahan ini. 

 Dalam skenario ini, Anda dapat menggunakan `getRecordsResult.childShards` untuk mempelajari tentang pecahan anak baru dari pecahan yang sedang diproses yang dibuat oleh split atau merge. Untuk informasi selengkapnya, lihat [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).

 Dalam kasus split, dua pecahan baru keduanya `parentShardId` sama dengan ID pecahan pecahan yang Anda proses sebelumnya. Nilai `adjacentParentShardId` untuk kedua pecahan ini adalah`null`. 

 Dalam kasus penggabungan, pecahan baru tunggal yang dibuat oleh penggabungan memiliki `parentShardId` sama dengan ID pecahan dari salah satu pecahan induk dan `adjacentParentShardId` sama dengan ID pecahan induk lainnya. Aplikasi Anda telah membaca semua data dari salah satu pecahan ini. Ini adalah pecahan yang `getRecordsResult.getNextShardIterator` dikembalikan`null`. Jika urutan data penting untuk aplikasi Anda, pastikan bahwa itu juga membaca semua data dari pecahan induk lainnya sebelum membaca data baru dari pecahan anak yang dibuat oleh penggabungan. 

 Jika Anda menggunakan beberapa prosesor untuk mengambil data dari aliran (katakanlah, satu prosesor per pecahan), dan pecahan pecahan atau penggabungan terjadi, sesuaikan jumlah prosesor ke atas atau ke bawah untuk beradaptasi dengan perubahan jumlah pecahan. 

 Untuk informasi lebih lanjut tentang resharding, termasuk diskusi tentang status pecahan — seperti —lihat. `CLOSED` [Reshard aliran](kinesis-using-sdk-java-resharding.md) 

# Mengembangkan konsumen fan-out yang ditingkatkan dengan AWS SDK untuk Java
<a name="building-enhanced-consumers-api"></a>

*Penggemar yang disempurnakan* adalah fitur Amazon Kinesis Data Streams yang memungkinkan konsumen menerima catatan dari aliran data dengan throughput khusus hingga 2 MB data per detik per pecahan. Konsumen yang menggunakan fan-out yang ditingkatkan tidak harus bersaing dengan konsumen lain yang menerima data dari streaming. Untuk informasi selengkapnya, lihat [Kembangkan konsumen fan-out yang ditingkatkan dengan throughput khusus](enhanced-consumers.md).

Anda dapat menggunakan operasi API untuk membangun konsumen yang menggunakan fan-out yang disempurnakan di Kinesis Data Streams.

**Untuk mendaftarkan konsumen dengan fan-out yang disempurnakan menggunakan Kinesis Data Streams API**

1. Hubungi [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)untuk mendaftarkan aplikasi Anda sebagai konsumen yang menggunakan fan-out yang disempurnakan. Kinesis Data Streams menghasilkan Nama Sumber Daya Amazon (ARN) untuk konsumen dan mengembalikannya sebagai respons.

1. Untuk mulai mendengarkan pecahan tertentu, berikan ARN konsumen dalam panggilan ke. [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) Kinesis Data Streams kemudian mulai mendorong catatan dari pecahan itu kepada Anda, dalam bentuk peristiwa bertipe melalui koneksi [SubscribeToShardEvent](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html)HTTP/2. Koneksi tetap terbuka hingga 5 menit. Panggil [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)lagi jika Anda ingin terus menerima catatan dari pecahan setelah `future` yang dikembalikan oleh panggilan untuk [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)menyelesaikan secara normal atau luar biasa.
**catatan**  
`SubscribeToShard`API juga mengembalikan daftar pecahan anak dari pecahan saat ini saat akhir pecahan saat ini tercapai. 

1. Untuk membatalkan pendaftaran konsumen yang menggunakan fan-out yang ditingkatkan, hubungi. [DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)

Kode berikut adalah contoh bagaimana Anda dapat berlangganan konsumen Anda ke pecahan, memperbarui langganan secara berkala, dan menangani acara.

```
    import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
    import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
     
    import java.util.concurrent.CompletableFuture;
     
    /**
     * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java
     * for complete code and more examples.
     */
    public class SubscribeToShardSimpleImpl {
     
        private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737";
        private static final String SHARD_ID = "shardId-000000000000";
     
        public static void main(String[] args) {
     
            KinesisAsyncClient client = KinesisAsyncClient.create();
     
            SubscribeToShardRequest request = SubscribeToShardRequest.builder()
                    .consumerARN(CONSUMER_ARN)
                    .shardId(SHARD_ID)
                    .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build();
     
            // Call SubscribeToShard iteratively to renew the subscription periodically.
            while(true) {
                // Wait for the CompletableFuture to complete normally or exceptionally.
                callSubscribeToShardWithVisitor(client, request).join();
            }
     
            // Close the connection before exiting.
            // client.close();
        }
     
     
        /**
         * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface.
         */
        private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) {
            SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() {
                @Override
                public void visit(SubscribeToShardEvent event) {
                    System.out.println("Received subscribe to shard event " + event);
                }
            };
            SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
                    .builder()
                    .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
                    .subscriber(visitor)
                    .build();
            return client.subscribeToShard(request, responseHandler);
        }
    }
```

 Jika `event.ContinuationSequenceNumber` kembali`null`, ini menunjukkan bahwa pecahan pecahan atau penggabungan telah terjadi yang melibatkan pecahan ini. Pecahan ini sekarang dalam `CLOSED` keadaan, dan Anda telah membaca semua catatan data yang tersedia dari pecahan ini. Dalam skenario ini, per contoh di atas, Anda dapat menggunakan `event.childShards` untuk mempelajari tentang pecahan anak baru dari pecahan yang sedang diproses yang dibuat oleh split atau merge. Untuk informasi selengkapnya, lihat [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).

# Berinteraksi dengan data menggunakan AWS Glue Schema Registry
<a name="building-enhanced-consumers-glue-schema-registry"></a>

Anda dapat mengintegrasikan aliran data Kinesis Anda dengan Registri Skema. AWS Glue Registri AWS Glue Skema memungkinkan Anda untuk menemukan, mengontrol, dan mengembangkan skema secara terpusat, sambil memastikan data yang dihasilkan terus divalidasi oleh skema terdaftar. Sebuah skema mendefinisikan struktur dan format catatan data. Sebuah skema adalah sebuah spesifikasi berversi untuk publikasi data yang handal, konsumsi, atau penyimpanan. Registri AWS Glue Skema memungkinkan Anda untuk meningkatkan kualitas end-to-end data dan tata kelola data dalam aplikasi streaming Anda. Untuk informasi selengkapnya, lihat [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Salah satu cara untuk mengatur integrasi ini adalah melalui `GetRecords` Kinesis Data Streams API yang tersedia AWS di Java SDK. 

Untuk petunjuk terperinci tentang cara mengatur integrasi Aliran Data Kinesis dengan Registri `GetRecords` Skema menggunakan Kinesis Data Streams, lihat bagian “Berinteraksi dengan Data Menggunakan Aliran Data APIs Kinesis” di Kasus [Penggunaan: Mengintegrasikan](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds) Amazon Kinesis Data APIs Streams dengan Registri Skema Glue. AWS 