

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Mengembangkan konsumen throughput bersama dengan AWS SDK untuk Java
<a name="developing-consumers-with-sdk"></a>

Salah satu metode untuk mengembangkan Kinesis Data Streams kustom yang dibagikan oleh konsumen adalah dengan menggunakan Amazon APIs Kinesis Data Streams dengan file. AWS SDK untuk Java Bagian ini menjelaskan penggunaan Kinesis APIs Data AWS SDK untuk Java Streams dengan file. Anda dapat memanggil Kinesis APIs Data Streams menggunakan bahasa pemrograman lain yang berbeda. Untuk informasi selengkapnya tentang semua yang tersedia AWS SDKs, lihat [Mulai Mengembangkan dengan Amazon Web Services](https://aws.amazon.com/developers/getting-started/). 

Contoh kode Java di bagian ini menunjukkan cara melakukan operasi API Kinesis Data Streams dasar, dan dibagi secara logis berdasarkan jenis operasi. Contoh-contoh ini tidak mewakili kode siap produksi. Mereka tidak memeriksa semua kemungkinan pengecualian atau memperhitungkan semua kemungkinan pertimbangan keamanan atau kinerja. 

**Topics**
+ [Dapatkan data dari aliran](#kinesis-using-sdk-java-get-data)
+ [Gunakan iterator shard](#kinesis-using-sdk-java-get-data-shard-iterators)
+ [Gunakan GetRecords](#kinesis-using-sdk-java-get-data-getrecords)
+ [Beradaptasi dengan reshard](#kinesis-using-sdk-java-get-data-reshard)

## Dapatkan data dari aliran
<a name="kinesis-using-sdk-java-get-data"></a>

Kinesis APIs Data Streams `getShardIterator` menyertakan dan metode `getRecords` yang dapat Anda panggil untuk mengambil catatan dari aliran data. Ini adalah model tarik, di mana kode Anda menarik catatan data langsung dari pecahan aliran data.

**penting**  
Kami menyarankan Anda menggunakan dukungan prosesor rekaman yang disediakan oleh KCL untuk mengambil catatan dari aliran data Anda. Ini adalah model push, di mana Anda menerapkan kode yang memproses data. KCL mengambil catatan data dari aliran data dan mengirimkannya ke kode aplikasi Anda. Selain itu, KCL menyediakan fungsionalitas failover, recovery, dan load balancing. Untuk informasi selengkapnya, lihat [Mengembangkan Konsumen Kustom dengan Throughput Bersama Menggunakan KCL](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html).

Namun, dalam beberapa kasus Anda mungkin lebih suka menggunakan Kinesis APIs Data Streams. Misalnya, untuk menerapkan alat khusus untuk memantau atau men-debug aliran data Anda.

**penting**  
Kinesis Data Streams mendukung perubahan pada periode retensi rekaman data aliran data Anda. Untuk informasi selengkapnya, lihat [Ubah periode retensi data](kinesis-extended-retention.md).

## Gunakan iterator shard
<a name="kinesis-using-sdk-java-get-data-shard-iterators"></a>

Anda mengambil catatan dari aliran pada basis per-shard. *Untuk setiap pecahan, dan untuk setiap batch catatan yang Anda ambil dari pecahan itu, Anda harus mendapatkan iterator shard.* Iterator shard digunakan dalam `getRecordsRequest` objek untuk menentukan pecahan dari mana catatan akan diambil. Jenis yang terkait dengan iterator shard menentukan titik dalam pecahan dari mana catatan harus diambil (lihat nanti di bagian ini untuk lebih jelasnya). Sebelum Anda dapat bekerja dengan iterator shard, Anda harus mengambil shard. Untuk informasi selengkapnya, lihat [Daftar pecahan](kinesis-using-sdk-java-list-shards.md).

Dapatkan iterator shard awal menggunakan metode ini`getShardIterator`. Dapatkan iterator pecahan untuk kumpulan catatan tambahan menggunakan `getNextShardIterator` metode `getRecordsResult` objek yang dikembalikan oleh metode. `getRecords` Sebuah iterator shard berlaku selama 5 menit. Jika Anda menggunakan iterator shard saat valid, Anda mendapatkan yang baru. Setiap iterator shard tetap valid selama 5 menit, bahkan setelah digunakan.

Untuk mendapatkan iterator shard awal, buat instance `GetShardIteratorRequest` dan teruskan ke metode. `getShardIterator` Untuk mengkonfigurasi permintaan, tentukan stream dan ID shard. Untuk informasi tentang cara mendapatkan aliran di AWS akun Anda, lihat[Daftar aliran](kinesis-using-sdk-java-list-streams.md). Untuk informasi tentang cara mendapatkan pecahan dalam aliran, lihat[Daftar pecahan](kinesis-using-sdk-java-list-shards.md).

```
String shardIterator;
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName(myStreamName);
getShardIteratorRequest.setShardId(shard.getShardId());
getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");

GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest);
shardIterator = getShardIteratorResult.getShardIterator();
```

Kode sampel ini menentukan `TRIM_HORIZON` sebagai tipe iterator ketika mendapatkan iterator shard awal. *Jenis iterator ini berarti bahwa catatan harus dikembalikan dimulai dengan catatan pertama ditambahkan ke shard—daripada dimulai dengan catatan yang paling baru ditambahkan, juga dikenal sebagai tip.* Berikut ini adalah jenis iterator yang mungkin:
+ `AT_SEQUENCE_NUMBER`
+ `AFTER_SEQUENCE_NUMBER`
+ `AT_TIMESTAMP`
+ `TRIM_HORIZON`
+ `LATEST`

Untuk informasi selengkapnya, lihat [ShardIteratorType](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType).

Beberapa tipe iterator mengharuskan Anda menentukan nomor urut selain jenisnya; misalnya:

```
getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER");
getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);
```

Setelah Anda mendapatkan catatan menggunakan`getRecords`, Anda bisa mendapatkan nomor urut untuk catatan dengan memanggil `getSequenceNumber` metode catatan. 

```
record.getSequenceNumber()
```

Selain itu, kode yang menambahkan catatan ke aliran data bisa mendapatkan nomor urut untuk catatan tambahan dengan memanggil `getSequenceNumber` hasil`putRecord`. 

```
lastSequenceNumber = putRecordResult.getSequenceNumber();
```

Anda dapat menggunakan nomor urut untuk menjamin peningkatan urutan catatan secara ketat. Untuk informasi selengkapnya, lihat contoh kode di[PutRecord contoh](developing-producers-with-sdk.md#kinesis-using-sdk-java-putrecord-example).

## Gunakan GetRecords
<a name="kinesis-using-sdk-java-get-data-getrecords"></a>

Setelah Anda mendapatkan iterator shard, buat instance objek. `GetRecordsRequest` Tentukan iterator untuk permintaan menggunakan `setShardIterator` metode. 

Secara opsional, Anda juga dapat mengatur jumlah catatan untuk diambil menggunakan metode ini`setLimit`. Jumlah catatan yang dikembalikan `getRecords` selalu sama dengan atau kurang dari batas ini. Jika Anda tidak menentukan batas ini, `getRecords` mengembalikan 10 MB catatan diambil. Kode contoh di bawah ini menetapkan batas ini menjadi 25 catatan.

Jika tidak ada catatan yang dikembalikan, itu berarti tidak ada catatan data saat ini tersedia dari pecahan ini pada nomor urut yang direferensikan oleh iterator pecahan. Dalam situasi ini, aplikasi Anda harus menunggu sejumlah waktu yang sesuai untuk sumber data untuk streaming. Kemudian cobalah untuk mendapatkan data dari pecahan lagi menggunakan iterator shard yang dikembalikan oleh panggilan sebelumnya ke. `getRecords` 

Lewati `getRecordsRequest` ke `getRecords` metode, dan tangkap nilai yang dikembalikan sebagai `getRecordsResult` objek. Untuk mendapatkan catatan data, panggil `getRecords` metode pada `getRecordsResult` objek. 

```
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(25);

GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest);
List<Record> records = getRecordsResult.getRecords();
```

Untuk mempersiapkan panggilan lain`getRecords`, dapatkan iterator shard berikutnya dari. `getRecordsResult` 

```
shardIterator = getRecordsResult.getNextShardIterator();
```

Untuk hasil terbaik, tidurlah setidaknya 1 detik (1.000 milidetik) di antara panggilan `getRecords` untuk menghindari melebihi batas frekuensi. `getRecords` 

```
try {
  Thread.sleep(1000);
}
catch (InterruptedException e) {}
```

Biasanya, Anda harus memanggil `getRecords` dalam satu lingkaran, bahkan ketika Anda mengambil satu catatan dalam skenario pengujian. Satu panggilan ke `getRecords` mungkin mengembalikan daftar catatan kosong, bahkan ketika pecahan berisi lebih banyak catatan di nomor urutan selanjutnya. Ketika ini terjadi, yang `NextShardIterator` dikembalikan bersama dengan daftar catatan kosong mereferensikan nomor urut selanjutnya dalam pecahan, dan `getRecords` panggilan berturut-turut akhirnya mengembalikan catatan. Sampel berikut menunjukkan penggunaan loop.

**Contoh: GetRecords**  
Contoh kode berikut mencerminkan `getRecords` tips di bagian ini, termasuk membuat panggilan dalam satu lingkaran.

```
// Continuously read data records from a shard
List<Record> records;
    
while (true) {
   
  // Create a new getRecordsRequest with an existing shardIterator 
  // Set the maximum records to return to 25
  
  GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
  getRecordsRequest.setShardIterator(shardIterator);
  getRecordsRequest.setLimit(25); 

  GetRecordsResult result = client.getRecords(getRecordsRequest);
  
  // Put the result into record list. The result can be empty.
  records = result.getRecords();
  
  try {
    Thread.sleep(1000);
  } 
  catch (InterruptedException exception) {
    throw new RuntimeException(exception);
  }
  
  shardIterator = result.getNextShardIterator();
}
```

Jika Anda menggunakan Perpustakaan Klien Kinesis, mungkin akan melakukan beberapa panggilan sebelum mengembalikan data. Perilaku ini dirancang dan tidak menunjukkan masalah dengan KCL atau data Anda.

## Beradaptasi dengan reshard
<a name="kinesis-using-sdk-java-get-data-reshard"></a>

 Jika `getRecordsResult.getNextShardIterator` kembali`null`, ini menunjukkan bahwa pecahan pecahan atau penggabungan telah terjadi yang melibatkan pecahan ini. Pecahan ini sekarang dalam `CLOSED` keadaan dan Anda telah membaca semua catatan data yang tersedia dari pecahan ini. 

 Dalam skenario ini, Anda dapat menggunakan `getRecordsResult.childShards` untuk mempelajari tentang pecahan anak baru dari pecahan yang sedang diproses yang dibuat oleh split atau merge. Untuk informasi selengkapnya, lihat [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).

 Dalam kasus split, dua pecahan baru keduanya `parentShardId` sama dengan ID pecahan pecahan yang Anda proses sebelumnya. Nilai `adjacentParentShardId` untuk kedua pecahan ini adalah`null`. 

 Dalam kasus penggabungan, pecahan baru tunggal yang dibuat oleh penggabungan memiliki `parentShardId` sama dengan ID pecahan dari salah satu pecahan induk dan `adjacentParentShardId` sama dengan ID pecahan induk lainnya. Aplikasi Anda telah membaca semua data dari salah satu pecahan ini. Ini adalah pecahan yang `getRecordsResult.getNextShardIterator` dikembalikan`null`. Jika urutan data penting untuk aplikasi Anda, pastikan bahwa itu juga membaca semua data dari pecahan induk lainnya sebelum membaca data baru dari pecahan anak yang dibuat oleh penggabungan. 

 Jika Anda menggunakan beberapa prosesor untuk mengambil data dari aliran (katakanlah, satu prosesor per pecahan), dan pecahan pecahan atau penggabungan terjadi, sesuaikan jumlah prosesor ke atas atau ke bawah untuk beradaptasi dengan perubahan jumlah pecahan. 

 Untuk informasi lebih lanjut tentang resharding, termasuk diskusi tentang status pecahan — seperti —lihat. `CLOSED` [Reshard aliran](kinesis-using-sdk-java-resharding.md) 