

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Mengembangkan Konsumen Perpustakaan Klien Kinesis di Jawa
<a name="kinesis-record-processor-implementation-app-java"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Anda dapat menggunakan Kinesis Client Library (KCL) untuk membangun aplikasi yang memproses data dari aliran data Kinesis Anda. Perpustakaan Klien Kinesis tersedia dalam berbagai bahasa. Topik ini membahas Java. Untuk melihat referensi Javadoc, lihat topik [AWS Javadoc](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html) untuk Kelas. AmazonKinesisClient

Untuk mengunduh Java KCL dari GitHub, buka [Perpustakaan Klien Kinesis (](https://github.com/awslabs/amazon-kinesis-client)Java). Untuk menemukan Java KCL di Apache Maven, buka halaman hasil pencarian [KCL](https://search.maven.org/#search|ga|1|amazon-kinesis-client). Untuk mengunduh kode sampel untuk aplikasi konsumen Java KCL dari GitHub, buka halaman [proyek sampel KCL untuk Java](https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis). GitHub 

Aplikasi sampel menggunakan [Apache Commons Logging.](http://commons.apache.org/proper/commons-logging/guide.html) Anda dapat mengubah konfigurasi logging dalam `configure` metode statis yang ditentukan dalam `AmazonKinesisApplicationSample.java` file. *Untuk informasi selengkapnya tentang cara menggunakan Apache Commons Logging dengan aplikasi Log4j dan AWS Java, lihat [Logging with Log4j](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html) di Panduan Pengembang.AWS SDK untuk Java *

Anda harus menyelesaikan tugas-tugas berikut saat menerapkan aplikasi konsumen KCL di Jawa:

**Topics**
+ [Menerapkan metode IRecord Processor](#kinesis-record-processor-implementation-interface-java)
+ [Menerapkan pabrik kelas untuk antarmuka IRecord Processor](#kinesis-record-processor-implementation-factory-java)
+ [Buat pekerja](#kcl-java-worker)
+ [Ubah properti konfigurasi](#kinesis-record-processor-initialization-java)
+ [Migrasi ke Versi 2 dari antarmuka prosesor rekaman](#kcl-java-v2-migration)

## Menerapkan metode IRecord Processor
<a name="kinesis-record-processor-implementation-interface-java"></a>

KCL saat ini mendukung dua versi antarmuka`IRecordProcessor`: Antarmuka asli tersedia dengan versi pertama KCL, dan versi 2 tersedia dimulai dengan KCL versi 1.5.0. Kedua antarmuka didukung penuh. Pilihan Anda tergantung pada persyaratan skenario spesifik Anda. Lihat Javadocs yang dibuat secara lokal atau kode sumber untuk melihat semua perbedaannya. Bagian berikut menguraikan implementasi minimal untuk memulai.

**Topics**
+ [Antarmuka Asli (Versi 1)](#kcl-java-interface-original)
+ [Antarmuka yang diperbarui (Versi 2)](#kcl-java-interface-v2)

### Antarmuka Asli (Versi 1)
<a name="kcl-java-interface-original"></a>

`IRecordProcessor`Antarmuka asli (`package com.amazonaws.services.kinesis.clientlibrary.interfaces`) memperlihatkan metode prosesor rekaman berikut yang harus diterapkan konsumen Anda. Sampel menyediakan implementasi yang dapat Anda gunakan sebagai titik awal (lihat`AmazonKinesisApplicationSampleRecordProcessor.java`).

```
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

**menginisialisasi**  
KCL memanggil `initialize` metode ketika prosesor rekaman dipakai, melewati ID pecahan tertentu sebagai parameter. Prosesor rekaman ini hanya memproses pecahan ini dan biasanya, kebalikannya juga benar (pecahan ini hanya diproses oleh prosesor rekaman ini). Namun, konsumen Anda harus memperhitungkan kemungkinan bahwa catatan data dapat diproses lebih dari satu kali. Kinesis Data *Streams memiliki semantik setidaknya* sekali, artinya setiap catatan data dari pecahan diproses setidaknya satu kali oleh pekerja di konsumen Anda. Untuk informasi lebih lanjut tentang kasus di mana pecahan tertentu dapat diproses oleh lebih dari satu pekerja, lihat[Gunakan resharding, scaling, dan parallel processing untuk mengubah jumlah pecahan](kinesis-record-processor-scaling.md).

```
public void initialize(String shardId)
```

**processRecords**  
KCL memanggil `processRecords` metode, melewati daftar catatan data dari pecahan yang ditentukan oleh metode. `initialize(shardId)` Prosesor rekaman memproses data dalam catatan ini sesuai dengan semantik konsumen. Misalnya, pekerja mungkin melakukan transformasi pada data dan kemudian menyimpan hasilnya di bucket Amazon Simple Storage Service (Amazon S3).

```
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 
```

Selain data itu sendiri, catatan juga berisi nomor urut dan kunci partisi. Pekerja dapat menggunakan nilai-nilai ini saat memproses data. Misalnya, pekerja dapat memilih bucket S3 untuk menyimpan data berdasarkan nilai kunci partisi. `Record`Kelas mengekspos metode berikut yang menyediakan akses ke data catatan, nomor urut, dan kunci partisi. 

```
record.getData()  
record.getSequenceNumber() 
record.getPartitionKey()
```

Dalam sampel, metode privat `processRecordsWithRetries` memiliki kode yang menunjukkan bagaimana seorang pekerja dapat mengakses data rekaman, nomor urut, dan kunci partisi.

Kinesis Data Streams membutuhkan prosesor rekaman untuk melacak catatan yang telah diproses dalam pecahan. KCL menangani pelacakan ini untuk Anda dengan meneruskan checkpointer () `IRecordProcessorCheckpointer` ke. `processRecords` Prosesor rekaman memanggil `checkpoint` metode pada antarmuka ini untuk menginformasikan KCL tentang seberapa jauh perkembangannya dalam memproses catatan di pecahan. Jika pekerja gagal, KCL menggunakan informasi ini untuk memulai kembali pemrosesan pecahan pada catatan diproses terakhir yang diketahui.

Untuk operasi split atau merge, KCL tidak akan mulai memproses pecahan baru sampai prosesor untuk pecahan asli dipanggil `checkpoint` untuk memberi sinyal bahwa semua pemrosesan pada pecahan asli selesai.

Jika Anda tidak melewati parameter, KCL mengasumsikan bahwa panggilan ke `checkpoint` berarti bahwa semua catatan telah diproses, hingga catatan terakhir yang diteruskan ke prosesor rekaman. Oleh karena itu, prosesor rekaman harus memanggil `checkpoint` hanya setelah memproses semua catatan dalam daftar yang diteruskan ke sana. Prosesor rekaman tidak perlu memanggil `checkpoint` setiap panggilan ke`processRecords`. Prosesor dapat, misalnya, memanggil `checkpoint` setiap panggilan ketiga ke`processRecords`. Anda dapat secara opsional menentukan nomor urut yang tepat dari catatan sebagai parameter untuk`checkpoint`. Dalam hal ini, KCL mengasumsikan bahwa semua catatan telah diproses hingga catatan itu saja.

Dalam sampel, metode pribadi `checkpoint` menunjukkan cara memanggil `IRecordProcessorCheckpointer.checkpoint` menggunakan penanganan pengecualian yang sesuai dan logika coba lagi.

KCL bergantung pada `processRecords` untuk menangani pengecualian apa pun yang timbul dari pemrosesan catatan data. Jika pengecualian dilemparkan`processRecords`, KCL melompati catatan data yang diteruskan sebelum pengecualian. Artinya, catatan ini tidak dikirim kembali ke prosesor rekaman yang melemparkan pengecualian atau ke prosesor rekaman lainnya di konsumen.

**penonaktifan**  
KCL memanggil `shutdown` metode baik saat pemrosesan berakhir (alasan shutdown adalah`TERMINATE`) atau pekerja tidak lagi merespons (alasan shutdown adalah). `ZOMBIE`

```
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

Pemrosesan berakhir ketika prosesor rekaman tidak menerima catatan lebih lanjut dari pecahan, karena pecahan dipecah atau digabungkan, atau aliran dihapus.

KCL juga meneruskan `IRecordProcessorCheckpointer` antarmuka ke`shutdown`. Jika alasan shutdown adalah`TERMINATE`, prosesor rekaman harus menyelesaikan pemrosesan catatan data apa pun, dan kemudian memanggil `checkpoint` metode pada antarmuka ini.

### Antarmuka yang diperbarui (Versi 2)
<a name="kcl-java-interface-v2"></a>

`IRecordProcessor`Antarmuka (`package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`) yang diperbarui memperlihatkan metode prosesor rekaman berikut yang harus diterapkan konsumen Anda: 

```
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
```

Semua argumen dari versi asli antarmuka dapat diakses melalui metode get pada objek kontainer. Misalnya, untuk mengambil daftar catatan di`processRecords()`, Anda dapat menggunakan`processRecordsInput.getRecords()`.

Pada versi 2 antarmuka ini (KCL 1.5.0 dan yang lebih baru), input baru berikut tersedia selain input yang disediakan oleh antarmuka asli:

nomor urut awal  
Dalam `InitializationInput` objek yang diteruskan ke `initialize()` operasi, nomor urut awal dari mana catatan akan diberikan ke instance prosesor rekaman. Ini adalah nomor urut yang terakhir diperiksa oleh instance prosesor rekaman yang sebelumnya memproses pecahan yang sama. Ini disediakan jika aplikasi Anda membutuhkan informasi ini. 

nomor urut pos pemeriksaan yang tertunda  
Dalam `InitializationInput` objek yang diteruskan ke `initialize()` operasi, nomor urutan pos pemeriksaan yang tertunda (jika ada) yang tidak dapat dilakukan sebelum instance prosesor rekaman sebelumnya berhenti. 

## Menerapkan pabrik kelas untuk antarmuka IRecord Processor
<a name="kinesis-record-processor-implementation-factory-java"></a>

Anda juga perlu mengimplementasikan pabrik untuk kelas yang mengimplementasikan metode prosesor rekaman. Ketika konsumen Anda membuat instance pekerja, ia meneruskan referensi ke pabrik ini.

Sampel mengimplementasikan kelas pabrik dalam file `AmazonKinesisApplicationSampleRecordProcessorFactory.java` menggunakan antarmuka prosesor rekaman asli. Jika Anda ingin pabrik kelas membuat prosesor rekaman versi 2, gunakan nama paket`com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`.

```
  public class SampleRecordProcessorFactory implements IRecordProcessorFactory { 
      /**
      * Constructor.
      */
      public SampleRecordProcessorFactory() {
          super();
      }
      /**
      * {@inheritDoc}
      */
      @Override
      public IRecordProcessor createProcessor() {
          return new SampleRecordProcessor();
      }
  }
```

## Buat pekerja
<a name="kcl-java-worker"></a>

Seperti dibahas dalam[Menerapkan metode IRecord Processor](#kinesis-record-processor-implementation-interface-java), ada dua versi antarmuka prosesor rekaman KCL untuk dipilih, yang memengaruhi cara Anda membuat pekerja. Antarmuka prosesor rekaman asli menggunakan struktur kode berikut untuk membuat pekerja:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker(recordProcessorFactory, config);
```

Dengan versi 2 dari antarmuka prosesor rekaman, Anda dapat menggunakan `Worker.Builder` untuk membuat pekerja tanpa perlu khawatir tentang konstruktor mana yang akan digunakan dan urutan argumen. Antarmuka prosesor rekaman yang diperbarui menggunakan struktur kode berikut untuk membuat pekerja:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

## Ubah properti konfigurasi
<a name="kinesis-record-processor-initialization-java"></a>

Sampel memberikan nilai default untuk properti konfigurasi. Data konfigurasi untuk pekerja ini kemudian dikonsolidasikan dalam sebuah `KinesisClientLibConfiguration` objek. Objek ini dan referensi ke pabrik kelas untuk `IRecordProcessor` diteruskan dalam panggilan yang membuat instance pekerja. Anda dapat mengganti salah satu properti ini dengan nilai Anda sendiri menggunakan file properti Java (lihat`AmazonKinesisApplicationSample.java`).

### Nama aplikasi
<a name="configuration-property-application-name"></a>

KCL memerlukan nama aplikasi yang unik di seluruh aplikasi Anda, dan di seluruh tabel Amazon DynamoDB di Wilayah yang sama. Ini menggunakan nilai konfigurasi nama aplikasi dengan cara berikut:
+ Semua pekerja yang terkait dengan nama aplikasi ini diasumsikan bekerja sama pada aliran yang sama. Pekerja ini dapat didistribusikan pada beberapa contoh. Jika Anda menjalankan instance tambahan dari kode aplikasi yang sama, tetapi dengan nama aplikasi yang berbeda, KCL memperlakukan instance kedua sebagai aplikasi yang sepenuhnya terpisah yang juga beroperasi pada aliran yang sama.
+ KCL membuat tabel DynamoDB dengan nama aplikasi dan menggunakan tabel untuk mempertahankan informasi status (seperti pos pemeriksaan dan pemetaan pecahan pekerja) untuk aplikasi. Setiap aplikasi memiliki tabel DynamoDB sendiri. Untuk informasi selengkapnya, lihat [Gunakan tabel sewa untuk melacak pecahan yang diproses oleh aplikasi konsumen KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Siapkan kredensil
<a name="kinesis-record-processor-cred-java"></a>

Anda harus membuat AWS kredensil Anda tersedia untuk salah satu penyedia kredensi dalam rantai penyedia kredensi default. Misalnya, jika Anda menjalankan konsumen pada instans EC2, sebaiknya Anda meluncurkan instans dengan peran IAM. AWS kredensil yang mencerminkan izin yang terkait dengan peran IAM ini tersedia untuk aplikasi pada instance melalui metadata instance-nya. Ini adalah cara paling aman untuk mengelola kredensil bagi konsumen yang berjalan pada instans EC2.

Aplikasi sampel pertama kali mencoba untuk mengambil kredenal IAM dari metadata instance: 

```
credentialsProvider = new InstanceProfileCredentialsProvider(); 
```

Jika aplikasi sampel tidak dapat memperoleh kredensil dari metadata instance, aplikasi tersebut mencoba mengambil kredensil dari file properti:

```
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
```

*Untuk informasi selengkapnya tentang metadata instans, lihat [Metadata Instans di Panduan Pengguna](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html) Amazon EC2.*

### Gunakan ID pekerja untuk beberapa instance
<a name="kinesis-record-processor-workerid-java"></a>

Kode inisialisasi sampel membuat ID untuk pekerja`workerId`, menggunakan nama komputer lokal dan menambahkan pengidentifikasi unik global seperti yang ditunjukkan dalam cuplikan kode berikut. Pendekatan ini mendukung skenario beberapa contoh aplikasi konsumen yang berjalan pada satu komputer.

```
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
```

## Migrasi ke Versi 2 dari antarmuka prosesor rekaman
<a name="kcl-java-v2-migration"></a>

Jika Anda ingin memigrasikan kode yang menggunakan antarmuka asli, selain langkah-langkah yang dijelaskan sebelumnya, langkah-langkah berikut diperlukan:

1. Ubah kelas prosesor rekaman Anda untuk mengimpor antarmuka prosesor rekaman versi 2:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   ```

1. Ubah referensi ke input untuk menggunakan `get` metode pada objek kontainer. Misalnya, dalam `shutdown()` operasi, ubah "`checkpointer`" menjadi "`shutdownInput.getCheckpointer()`”.

1. Ubah kelas pabrik prosesor rekaman Anda untuk mengimpor antarmuka pabrik prosesor rekaman versi 2:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   ```

1. Ubah konstruksi pekerja yang akan digunakan`Worker.Builder`. Contoh:

   ```
   final Worker worker = new Worker.Builder()
       .recordProcessorFactory(recordProcessorFactory)
       .config(config)
       .build();
   ```