

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Gunakan Perpustakaan Klien Kinesis
<a name="kcl"></a>

## Apa itu Perpustakaan Klien Kinesis?
<a name="kcl-library-what-is"></a>

Kinesis Client Library (KCL) adalah pustaka perangkat lunak Java mandiri yang dirancang untuk menyederhanakan proses konsumsi dan pemrosesan data dari Amazon Kinesis Data Streams. KCL menangani banyak tugas kompleks yang terkait dengan komputasi terdistribusi, memungkinkan pengembang fokus pada penerapan logika bisnis mereka untuk memproses data. Ini mengelola aktivitas seperti penyeimbangan beban di beberapa pekerja, menanggapi kegagalan pekerja, memeriksa catatan yang diproses, dan menanggapi perubahan jumlah pecahan dalam aliran.

KCL sering diperbarui untuk menggabungkan versi yang lebih baru dari pustaka yang mendasari, peningkatan keamanan, dan perbaikan bug. Kami menyarankan Anda menggunakan versi terbaru KCL untuk menghindari masalah yang diketahui dan mendapatkan manfaat dari semua peningkatan terbaru. Untuk menemukan versi KCL terbaru, lihat [KCL](https://github.com/awslabs/amazon-kinesis-client) Github. 

**penting**  
Kami menyarankan Anda menggunakan versi KCL terbaru untuk menghindari bug dan masalah yang diketahui. Jika Anda menggunakan KCL 2.6.0 atau sebelumnya, tingkatkan ke KCL 2.6.1 atau yang lebih baru untuk menghindari kondisi langka yang dapat memblokir pemrosesan pecahan saat kapasitas aliran berubah. 
KCL adalah perpustakaan Java. Support untuk bahasa selain Java disediakan menggunakan daemon berbasis Java yang disebut. MultiLangDaemon MultiLangDaemonberinteraksi dengan aplikasi KCL melalui STDIN dan STDOUT. Untuk informasi lebih lanjut tentang MultiLangDaemon on GitHub, lihat[Mengembangkan konsumen dengan KCL dalam bahasa non-Jawa](develop-kcl-consumers-non-java.md).
Jangan gunakan AWS SDK untuk Java versi 2.27.19 hingga 2.27.23 dengan KCL 3.x. Versi ini menyertakan masalah yang menyebabkan kesalahan pengecualian terkait dengan penggunaan DynamoDB KCL. Kami menyarankan Anda menggunakan AWS SDK untuk Java versi 2.28.0 atau yang lebih baru untuk menghindari masalah ini. 

## Fitur dan manfaat utama KCL
<a name="kcl-benefits"></a>

Berikut ini adalah fitur utama dan manfaat terkait dari KCL:
+ **Skalabilitas**: KCL memungkinkan aplikasi untuk menskalakan secara dinamis dengan mendistribusikan beban pemrosesan ke beberapa pekerja. Anda dapat menskalakan aplikasi masuk atau keluar, secara manual atau dengan auto-scaling, tanpa khawatir tentang redistribusi beban.
+ **Load balancing**: KCL secara otomatis menyeimbangkan beban pemrosesan di seluruh pekerja yang tersedia, menghasilkan distribusi pekerjaan yang merata di seluruh pekerja.
+ **Checkpointing**: KCL mengelola checkpointing catatan yang diproses, memungkinkan aplikasi untuk melanjutkan pemrosesan dari posisi terakhir yang berhasil diproses.
+ **Toleransi kesalahan**: KCL menyediakan mekanisme toleransi kesalahan bawaan, memastikan bahwa pemrosesan data berlanjut bahkan jika pekerja individu gagal. KCL juga menyediakan at-least-once pengiriman.
+ **Menangani perubahan tingkat aliran**: KCL beradaptasi dengan pecahan pecahan dan penggabungan yang mungkin terjadi karena perubahan volume data. Ini mempertahankan pemesanan dengan memastikan bahwa pecahan anak diproses hanya setelah pecahan induknya selesai dan diperiksa.
+ **Pemantauan**: KCL terintegrasi dengan Amazon CloudWatch untuk pemantauan tingkat konsumen.
+ **Dukungan multi-bahasa**: KCL secara native mendukung Java dan memungkinkan beberapa bahasa pemrograman non-Java melalui. MultiLangDaemon

# Konsep KCL
<a name="kcl-concepts"></a>

Bagian ini menjelaskan konsep inti dan interaksi Kinesis Client Library (KCL). Konsep-konsep ini sangat penting untuk mengembangkan dan mengelola aplikasi konsumen KCL.
+ **Aplikasi konsumen KCL — aplikasi** yang dibuat khusus yang dirancang untuk membaca dan memproses catatan dari aliran data Kinesis menggunakan Perpustakaan Klien Kinesis.
+ **Worker** — Aplikasi konsumen KCL biasanya didistribusikan, dengan satu atau lebih pekerja berjalan secara bersamaan. KCL mengkoordinasikan pekerja untuk mengkonsumsi data dari aliran secara terdistribusi dan menyeimbangkan beban secara merata di beberapa pekerja.
+ **Scheduler** — kelas tingkat tinggi yang digunakan pekerja KCL untuk mulai memproses data. Setiap pekerja KCL memiliki satu penjadwal. Penjadwal menginisialisasi dan mengawasi berbagai tugas, termasuk menyinkronkan informasi pecahan dari aliran data Kinesis, melacak tugas pecahan di antara pekerja, dan memproses data dari aliran berdasarkan pecahan yang ditetapkan ke pekerja. Scheduler dapat mengambil berbagai konfigurasi yang memengaruhi perilaku penjadwal, seperti nama aliran untuk diproses dan kredensyal. AWS Scheduler memulai pengiriman catatan data dari aliran ke prosesor rekaman.
+ **Record processor** — mendefinisikan logika bagaimana aplikasi konsumen KCL Anda memproses data yang diterimanya dari aliran data. Anda harus menerapkan logika pemrosesan data kustom Anda sendiri di prosesor rekaman. Seorang pekerja KCL membuat instance penjadwal. Penjadwal kemudian membuat instance satu prosesor rekaman untuk setiap pecahan yang disewakan. Seorang pekerja dapat menjalankan beberapa prosesor rekaman.
+ **Sewa** - mendefinisikan penugasan antara pekerja dan pecahan. Aplikasi konsumen KCL menggunakan sewa untuk mendistribusikan pemrosesan catatan data di beberapa pekerja. Setiap pecahan terikat hanya untuk satu pekerja dengan sewa pada waktu tertentu dan setiap pekerja dapat memegang satu atau lebih sewa secara bersamaan. Ketika seorang pekerja berhenti memegang sewa karena berhenti atau gagal, KCL menugaskan pekerja lain untuk mengambil sewa. Untuk mempelajari lebih lanjut tentang sewa, lihat [Dokumentasi Github:](https://github.com/awslabs/amazon-kinesis-client/blob/master/docs/lease-lifecycle.md#lease-lifecycle) Siklus Hidup Sewa.
+ **Tabel sewa - adalah tabel** Amazon DynamoDB unik yang digunakan untuk melacak semua sewa untuk aplikasi konsumen KCL. Setiap aplikasi konsumen KCL membuat tabel sewa sendiri. Tabel sewa digunakan untuk mempertahankan status di semua pekerja untuk mengoordinasikan pemrosesan data. Untuk informasi selengkapnya, lihat [Tabel metadata DynamoDB dan load balancing di KCL](kcl-dynamoDB.md).
+ **Checkpointing** — adalah proses terus-menerus menyimpan posisi catatan terakhir yang berhasil diproses dalam pecahan. KCL mengelola checkpointing untuk memastikan bahwa pemrosesan dapat dilanjutkan dari posisi checkpoint terakhir jika pekerja gagal atau aplikasi restart. Pos pemeriksaan disimpan dalam tabel sewa DynamoDB sebagai bagian dari metadata sewa. Hal ini memungkinkan pekerja untuk melanjutkan pemrosesan dari tempat pekerja sebelumnya berhenti.

# Tabel metadata DynamoDB dan load balancing di KCL
<a name="kcl-dynamoDB"></a>

KCL mengelola metadata seperti sewa dan metrik pemanfaatan CPU dari pekerja. KCL melacak metadata ini menggunakan tabel DynamoDB. Untuk setiap aplikasi Amazon Kinesis Data Streams, KCL membuat tiga tabel DynamoDB untuk mengelola metadata: tabel sewa, tabel metrik pekerja, dan tabel status koordinator.

**catatan**  
*KCL 3.x memperkenalkan dua tabel metadata baru: *metrik pekerja* dan tabel status koordinator.*

**penting**  
 Anda harus menambahkan izin yang tepat untuk aplikasi KCL untuk membuat dan mengelola tabel metadata di DynamoDB. Lihat perinciannya di [Izin IAM diperlukan untuk aplikasi konsumen KCL](kcl-iam-permissions.md).  
Aplikasi konsumen KCL tidak secara otomatis menghapus ketiga tabel metadata DynamoDB ini. Pastikan Anda menghapus tabel metadata DynamoDB ini yang dibuat oleh aplikasi konsumen KCL saat Anda menonaktifkan aplikasi konsumen Anda untuk mencegah biaya yang tidak perlu.

## Meja sewa
<a name="kcl-leasetable"></a>

Tabel sewa adalah tabel Amazon DynamoDB unik yang digunakan untuk melacak pecahan yang disewa dan diproses oleh penjadwal aplikasi konsumen KCL. Setiap aplikasi konsumen KCL membuat tabel sewa sendiri. KCL menggunakan nama aplikasi konsumen untuk nama tabel sewa secara default. Anda dapat mengatur nama tabel kustom menggunakan konfigurasi. KCL juga menciptakan [indeks sekunder global](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GSI.html) pada tabel sewa dengan kunci partisi LeaseOwner untuk penemuan sewa yang efisien. Indeks sekunder global mencerminkan atribut LeaseKey dari tabel sewa dasar. Jika tabel sewa untuk aplikasi konsumen KCL Anda tidak ada saat aplikasi dimulai, salah satu pekerja membuat tabel sewa untuk aplikasi Anda.

Anda dapat melihat tabel sewa menggunakan konsol [Amazon DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html) saat aplikasi konsumen sedang berjalan.

**penting**  
Setiap nama aplikasi konsumen KCL harus unik untuk mencegah duplikasi nama tabel sewa. 
Akun Anda dikenakan biaya untuk biaya yang terkait dengan tabel DynamoDB, selain biaya yang terkait dengan Kinesis Data Streams itu sendiri. 

Setiap baris dalam tabel sewa mewakili pecahan yang sedang diproses oleh penjadwal aplikasi konsumen Anda. Bidang utama meliputi yang berikut:
+ **LeaseKey**: Untuk pemrosesan aliran tunggal, ini adalah ID pecahan. Untuk pemrosesan multi-stream dengan KCL, ini terstruktur sebagai. `account-id:StreamName:streamCreationTimestamp:ShardId` leaseKey adalah kunci partisi dari tabel sewa. Untuk informasi selengkapnya tentang pemrosesan multi-aliran, lihat[Pemrosesan multi-aliran dengan KCL](kcl-multi-stream.md).
+ **pos pemeriksaan:** Nomor urutan pos pemeriksaan terbaru untuk pecahan. 
+ **checkpointSubSequenceNomor:** Saat menggunakan fitur agregasi Perpustakaan Produser Kinesis, ini adalah ekstensi ke **pos pemeriksaan** yang melacak catatan pengguna individu dalam catatan Kinesis.
+ **LeaseCounter**: Digunakan untuk memeriksa apakah seorang pekerja saat ini memproses sewa secara aktif. LeaseCounter meningkat jika kepemilikan sewa ditransfer ke pekerja lain.
+ **LeaseOwner**: Pekerja saat ini yang memegang sewa ini.
+ **ownerSwitchesSincePos pemeriksaan:** Berapa kali sewa ini telah berganti pekerja sejak pos pemeriksaan terakhir.
+ **parentShardId:** ID dari induk pecahan ini. Pastikan pecahan induk diproses sepenuhnya sebelum pemrosesan dimulai pada pecahan anak, mempertahankan urutan pemrosesan catatan yang benar.
+ **childShardId:** Daftar pecahan anak IDs yang dihasilkan dari pecahan atau penggabungan pecahan ini. Digunakan untuk melacak garis keturunan pecahan dan mengelola urutan pemrosesan selama operasi resharding.
+ **startingHashKey:** Batas bawah kisaran kunci hash untuk pecahan ini.
+ **endingHashKey:** Batas atas rentang kunci hash untuk pecahan ini.

Jika Anda menggunakan pemrosesan multi-aliran dengan KCL, Anda melihat dua bidang tambahan berikut dalam tabel sewa. Untuk informasi selengkapnya, lihat [Pemrosesan multi-aliran dengan KCL](kcl-multi-stream.md).
+ **ShardID:** ID pecahan.
+ **StreamName:** Pengidentifikasi aliran data dalam format berikut:. `account-id:StreamName:streamCreationTimestamp`

## Tabel metrik pekerja
<a name="kcl-worker-metrics-table"></a>

Tabel metrik Worker adalah tabel Amazon DynamoDB unik untuk setiap aplikasi KCL dan digunakan untuk merekam metrik pemanfaatan CPU dari setiap pekerja. Metrik ini akan digunakan oleh KCL untuk melakukan penugasan sewa yang efisien untuk menghasilkan pemanfaatan sumber daya yang seimbang di seluruh pekerja. KCL menggunakan `KCLApplicationName-WorkerMetricStats` nama tabel metrik pekerja secara default.

## Tabel negara koordinator
<a name="kcl-coordinator-state-table"></a>

Tabel status koordinator adalah tabel Amazon DynamoDB unik untuk setiap aplikasi KCL dan digunakan untuk menyimpan informasi status internal untuk pekerja. Misalnya, tabel negara koordinator menyimpan data mengenai pemilihan pemimpin atau metadata yang terkait dengan migrasi di tempat dari KCL 2.x ke KCL 3.x. KCL menggunakan `KCLApplicationName-CoordinatorState` nama tabel status koordinator secara default.

## Mode kapasitas DynamoDB untuk tabel metadata yang dibuat oleh KCL
<a name="kcl-capacity-mode"></a>

[Secara default, Kinesis Client Library (KCL) membuat tabel metadata DynamoDB seperti tabel sewa, tabel metrik pekerja, dan tabel status koordinator menggunakan mode kapasitas sesuai permintaan.](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/on-demand-capacity-mode.html) Mode ini secara otomatis menskalakan kapasitas baca dan tulis untuk mengakomodasi lalu lintas tanpa memerlukan perencanaan kapasitas. Kami sangat menyarankan Anda untuk menjaga mode kapasitas sebagai mode sesuai permintaan untuk pengoperasian tabel metadata ini dengan lebih efisien.

Jika Anda memutuskan untuk mengganti tabel sewa ke [mode kapasitas yang disediakan](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/provisioned-capacity-mode.html), ikuti praktik terbaik berikut:
+ Menganalisis pola penggunaan:
  + Pantau pola dan penggunaan baca dan tulis aplikasi Anda (RCU, WCU) menggunakan metrik Amazon. CloudWatch 
  + Memahami persyaratan throughput puncak dan rata-rata.
+ Hitung kapasitas yang dibutuhkan:
  + Perkirakan unit kapasitas baca (RCUs) dan tulis unit kapasitas (WCUs) berdasarkan analisis Anda.
  + Pertimbangkan faktor-faktor seperti jumlah pecahan, frekuensi pos pemeriksaan, dan jumlah pekerja.
+ Menerapkan penskalaan otomatis:
  + Gunakan penskalaan [otomatis DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/provisioned-capacity-mode.html#ddb-autoscaling) untuk menyesuaikan kapasitas yang disediakan secara otomatis dan menetapkan batas kapasitas minimum dan maksimum yang sesuai. 
  + Penskalaan otomatis DynamoDB akan membantu menghindari tabel metadata KCL Anda mencapai batas kapasitas dan terhambat.
+ Pemantauan dan pengoptimalan rutin:
  + Terus memantau CloudWatch metrik untuk`ThrottledRequests`.
  + Sesuaikan kapasitas saat beban kerja Anda berubah seiring waktu.

Jika Anda mengalami tabel DynamoDB `ProvisionedThroughputExceededException` dalam metadata untuk aplikasi konsumen KCL Anda, Anda harus meningkatkan kapasitas throughput yang disediakan dari tabel DynamoDB. Jika Anda menetapkan tingkat tertentu unit kapasitas baca (RCU) dan unit kapasitas tulis (WCU) saat pertama kali membuat aplikasi konsumen Anda, itu mungkin tidak cukup seiring dengan bertambahnya penggunaan Anda. Misalnya, jika aplikasi konsumen KCL Anda sering melakukan pemeriksaan atau beroperasi pada aliran dengan banyak pecahan, Anda mungkin memerlukan lebih banyak unit kapasitas. [Untuk informasi tentang throughput yang disediakan di DynamoDB, lihat [kapasitas throughput DynamoDB dan memperbarui tabel di Panduan Pengembang Amazon DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/capacity-mode.html).](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.Basics.html#WorkingWithTables.Basics.UpdateTable)

## Bagaimana KCL memberikan sewa kepada pekerja dan menyeimbangkan beban
<a name="kcl-assign-leases"></a>

KCL terus mengumpulkan dan memantau metrik pemanfaatan CPU dari host komputasi yang menjalankan pekerja untuk memastikan distribusi beban kerja yang merata. Metrik pemanfaatan CPU ini disimpan dalam tabel metrik pekerja di DynamoDB. Jika KCL mendeteksi bahwa beberapa pekerja menunjukkan tingkat pemanfaatan CPU yang lebih tinggi dibandingkan dengan yang lain, itu akan menetapkan kembali sewa di antara pekerja untuk menurunkan beban pada pekerja yang sangat digunakan. Tujuannya adalah untuk menyeimbangkan beban kerja secara lebih merata di seluruh armada aplikasi konsumen, mencegah setiap pekerja menjadi kelebihan beban. Saat KCL mendistribusikan pemanfaatan CPU di seluruh armada aplikasi konsumen, Anda dapat mengukur kapasitas armada aplikasi konsumen Anda dengan memilih jumlah pekerja yang tepat atau menggunakan penskalaan otomatis untuk mengelola kapasitas komputasi secara efisien guna mencapai biaya yang lebih rendah.

**penting**  
KCL dapat mengumpulkan metrik pemanfaatan CPU dari pekerja hanya jika prasyarat tertentu terpenuhi. Lihat perinciannya di [Prasyarat](develop-kcl-consumers-java.md#develop-kcl-consumers-java-prerequisites). Jika KCL tidak dapat mengumpulkan metrik pemanfaatan CPU dari pekerja, KCL akan kembali menggunakan throughput per pekerja untuk menetapkan sewa dan menyeimbangkan beban di seluruh pekerja di armada. KCL akan memantau throughput yang diterima setiap pekerja pada waktu tertentu dan menetapkan kembali sewa untuk memastikan bahwa setiap pekerja mendapatkan tingkat throughput total yang sama dari sewa yang ditugaskan.

# Kembangkan konsumen dengan KCL
<a name="develop-kcl-consumers"></a>

Anda dapat menggunakan Kinesis Client Library (KCL) untuk membangun aplikasi konsumen yang memproses data dari aliran data Kinesis Anda.

KCL tersedia dalam berbagai bahasa. Topik ini mencakup bagaimana mengembangkan konsumen KCL dalam bahasa Jawa dan non-Jawa.
+ [Untuk melihat referensi Javadoc Perpustakaan Klien Kinesis, lihat Javadoc Perpustakaan Klien Amazon Kinesis.](https://javadoc.io/doc/software.amazon.kinesis/amazon-kinesis-client/latest/index.html)
+ Untuk mengunduh KCL untuk Java dari GitHub, lihat [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) untuk Java.
+ Untuk menemukan KCL untuk Java di Apache Maven, lihat [KCL](https://central.sonatype.com/artifact/software.amazon.kinesis/amazon-kinesis-client) Maven Central Repository.

**Topics**
+ [Kembangkan konsumen dengan KCL di Jawa](develop-kcl-consumers-java.md)
+ [Mengembangkan konsumen dengan KCL dalam bahasa non-Jawa](develop-kcl-consumers-non-java.md)

# Kembangkan konsumen dengan KCL di Jawa
<a name="develop-kcl-consumers-java"></a>

## Prasyarat
<a name="develop-kcl-consumers-java-prerequisites"></a>

Sebelum Anda mulai menggunakan KCL 3.x, pastikan Anda memiliki yang berikut:
+ Java Development Kit (JDK) 8 atau lebih baru
+ AWS SDK untuk Java 2.x
+ Maven atau Gradle untuk manajemen ketergantungan

KCL mengumpulkan metrik pemanfaatan CPU seperti pemanfaatan CPU dari host komputasi yang dijalankan pekerja untuk menyeimbangkan beban untuk mencapai tingkat pemanfaatan sumber daya yang merata di seluruh pekerja. Untuk mengaktifkan KCL mengumpulkan metrik pemanfaatan CPU dari pekerja, Anda harus memenuhi prasyarat berikut:

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ Sistem operasi Anda harus OS Linux.
+ Anda harus mengaktifkan [IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html)di instans EC2 Anda.

 **Amazon Elastic Container Service (Amazon ECS) di Amazon EC2**
+ Sistem operasi Anda harus OS Linux.
+ Anda harus mengaktifkan titik [akhir metadata tugas ECS](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html) versi 4. 
+ Versi agen penampung Amazon ECS Anda harus 1.39.0 atau yang lebih baru.

 **Amazon ECS aktif AWS Fargate**
+ Anda harus mengaktifkan titik [akhir metadata tugas Fargate](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html) versi 4. Jika Anda menggunakan platform Fargate versi 1.4.0 atau yang lebih baru, ini diaktifkan secara default. 
+ Platform Fargate versi 1.4.0 atau yang lebih baru.

 **Layanan Amazon Elastic Kubernetes (Amazon EKS) di Amazon EC2** 
+ Sistem operasi Anda harus OS Linux.

 **Amazon EKS di AWS Fargate**
+ Platform Fargate 1.3.0 atau yang lebih baru.

**penting**  
Jika KCL tidak dapat mengumpulkan metrik pemanfaatan CPU dari pekerja, KCL akan kembali menggunakan throughput per pekerja untuk menetapkan sewa dan menyeimbangkan beban di seluruh pekerja di armada. Untuk informasi selengkapnya, lihat [Bagaimana KCL memberikan sewa kepada pekerja dan menyeimbangkan beban](kcl-dynamoDB.md#kcl-assign-leases).

## Instal dan tambahkan dependensi
<a name="develop-kcl-consumers-java-installation"></a>

Jika Anda menggunakan Maven, tambahkan dependensi berikut ke file Anda. `pom.xml` Pastikan Anda mengganti 3.xx ke versi KCL terbaru. 

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

Jika Anda menggunakan Gradle, tambahkan berikut ini ke `build.gradle` file Anda. Pastikan Anda mengganti 3.xx ke versi KCL terbaru. 

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

Anda dapat memeriksa versi terbaru KCL di Repositori Pusat [Maven](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client).

## Menerapkan konsumen
<a name="develop-kcl-consumers-java-implemetation"></a>

Aplikasi konsumen KCL terdiri dari komponen-komponen kunci berikut:

**Topics**
+ [RecordProcessor](#implementation-recordprocessor)
+ [RecordProcessorFactory](#implementation-recordprocessorfactory)
+ [Penjadwal](#implementation-scheduler)
+ [Aplikasi Konsumen Utama](#implementation-main)

### RecordProcessor
<a name="implementation-recordprocessor"></a>

RecordProcessor adalah komponen inti di mana logika bisnis Anda untuk memproses catatan aliran data Kinesis berada. Ini mendefinisikan bagaimana aplikasi Anda memproses data yang diterimanya dari aliran Kinesis.

Tanggung jawab utama:
+ Inisialisasi pemrosesan untuk pecahan
+ Memproses kumpulan catatan dari aliran Kinesis
+ Pemrosesan shutdown untuk pecahan (misalnya, ketika pecahan pecah atau bergabung, atau sewa diserahkan ke host lain)
+ Menangani checkpointing untuk melacak kemajuan

Berikut ini menunjukkan contoh implementasi:

```
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.*;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class SampleRecordProcessor implements ShardRecordProcessor {
    private static final String SHARD_ID_MDC_KEY = "ShardId";
    private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);
    private String shardId;

    @Override
    public void initialize(InitializationInput initializationInput) {
        shardId = initializationInput.shardId();
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Processing {} record(s)", processRecordsInput.records().size());
            processRecordsInput.records().forEach(r -> 
                log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber())
            );
            
            // Checkpoint periodically
            processRecordsInput.checkpointer().checkpoint();
        } catch (Throwable t) {
            log.error("Caught throwable while processing records. Aborting.", t);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Lost lease, so terminating.");
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Reached shard end checkpointing.");
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at shard end. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Scheduler is shutting down, checkpointing.");
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }
}
```

Berikut ini adalah penjelasan rinci dari setiap metode yang digunakan dalam contoh:

**inisialisasi (Inisialisasi InitializationInput Input)**
+ Tujuan: Siapkan sumber daya atau status yang diperlukan untuk memproses catatan.
+ Ketika disebut: Sekali, ketika KCL menetapkan pecahan ke prosesor rekaman ini.
+ Poin kunci:
  + `initializationInput.shardId()`: ID pecahan yang akan ditangani prosesor ini.
  + `initializationInput.extendedSequenceNumber()`: Nomor urut untuk mulai memproses dari.

**ProcessRecords () ProcessRecordsInput processRecordsInput**
+ Tujuan: Memproses catatan yang masuk dan kemajuan pos pemeriksaan opsional.
+ Ketika disebut: Berulang kali, selama prosesor rekaman memegang sewa untuk pecahan.
+ Poin kunci:
  + `processRecordsInput.records()`: Daftar catatan untuk diproses.
  + `processRecordsInput.checkpointer()`: Digunakan untuk memeriksa kemajuan.
  + Pastikan Anda menangani pengecualian apa pun selama pemrosesan untuk mencegah KCL gagal.
  + Metode ini harus idempoten, karena catatan yang sama dapat diproses lebih dari sekali dalam beberapa skenario, seperti data yang belum diperiksa sebelum pekerja yang tidak terduga mogok atau restart.
  + Selalu siram data buffer sebelum checkpointing untuk memastikan konsistensi data.

**LeaseLost () LeaseLostInput leaseLostInput**
+ Tujuan: Bersihkan sumber daya khusus untuk memproses pecahan ini.
+ Ketika itu disebut: Ketika Scheduler lain mengambil alih sewa untuk pecahan ini.
+ Poin kunci:
  + Checkpointing tidak diperbolehkan dalam metode ini.

**shardEnded () ShardEndedInput shardEndedInput**
+ Tujuan: Selesaikan pemrosesan untuk pecahan dan pos pemeriksaan ini.
+ Saat disebut: Ketika pecahan terbelah atau bergabung, menunjukkan semua data untuk pecahan ini telah diproses.
+ Poin kunci:
  + `shardEndedInput.checkpointer()`: Digunakan untuk melakukan pemeriksaan akhir.
  + Checkpointing dalam metode ini adalah wajib untuk menyelesaikan pemrosesan.
  + Gagal menyiram data dan pos pemeriksaan di sini dapat mengakibatkan hilangnya data atau pemrosesan duplikat saat pecahan dibuka kembali.

**ShutdownRequested () ShutdownRequestedInput shutdownRequestedInput**
+ Tujuan: Pos pemeriksaan dan bersihkan sumber daya saat KCL dimatikan.
+ Ketika dipanggil: Ketika KCL dimatikan, misalnya, ketika aplikasi dihentikan).
+ Poin kunci:
  + `shutdownRequestedInput.checkpointer()`: Digunakan untuk melakukan checkpointing sebelum shutdown.
  + Pastikan Anda menerapkan checkpointing dalam metode sehingga kemajuan disimpan sebelum aplikasi berhenti.
  + Kegagalan untuk menyiram data dan pos pemeriksaan di sini dapat mengakibatkan hilangnya data atau pemrosesan ulang catatan saat aplikasi dimulai ulang.

**penting**  
KCL 3.x memastikan lebih sedikit pemrosesan ulang data saat sewa diserahkan dari satu pekerja ke pekerja lain dengan pos pemeriksaan sebelum pekerja sebelumnya dimatikan. Jika Anda tidak menerapkan logika checkpointing dalam `shutdownRequested()` metode, Anda tidak akan melihat manfaat ini. Pastikan Anda telah menerapkan logika checkpointing di dalam metode. `shutdownRequested()`

### RecordProcessorFactory
<a name="implementation-recordprocessorfactory"></a>

RecordProcessorFactory bertanggung jawab untuk membuat RecordProcessor instance baru. KCL menggunakan pabrik ini untuk membuat yang baru RecordProcessor untuk setiap pecahan yang perlu diproses aplikasi.

Tanggung jawab utama:
+ Buat RecordProcessor instance baru sesuai permintaan
+ Pastikan masing-masing RecordProcessor diinisialisasi dengan benar

Berikut ini adalah contoh implementasi:

```
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new SampleRecordProcessor();
    }
}
```

Dalam contoh ini, pabrik membuat yang baru SampleRecordProcessor setiap kali shardRecordProcessor () dipanggil. Anda dapat memperluas ini untuk memasukkan logika inisialisasi yang diperlukan.

### Penjadwal
<a name="implementation-scheduler"></a>

Scheduler adalah komponen tingkat tinggi yang mengoordinasikan semua aktivitas aplikasi KCL. Ini bertanggung jawab atas orkestrasi keseluruhan pemrosesan data.

Tanggung jawab utama:
+ Mengelola siklus hidup RecordProcessors
+ Menangani manajemen sewa untuk pecahan
+ Mengkoordinasikan pos pemeriksaan
+ Menyeimbangkan beban pemrosesan pecahan di beberapa pekerja aplikasi Anda
+ Menangani sinyal shutdown dan penghentian aplikasi yang anggun

Scheduler biasanya dibuat dan dimulai di Aplikasi Utama. Anda dapat memeriksa contoh implementasi Scheduler di bagian berikut, Aplikasi Konsumen Utama. 

### Aplikasi Konsumen Utama
<a name="implementation-main"></a>

Aplikasi Konsumen Utama mengikat semua komponen bersama-sama. Ini bertanggung jawab untuk menyiapkan konsumen KCL, menciptakan klien yang diperlukan, mengonfigurasi Scheduler, dan mengelola siklus hidup aplikasi.

Tanggung jawab utama:
+ Mengatur klien AWS layanan (Kinesis, DynamoDB,) CloudWatch
+ Konfigurasikan aplikasi KCL
+ Buat dan mulai Scheduler
+ Menangani shutdown aplikasi

Berikut ini adalah contoh implementasi:

```
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import java.util.UUID;

public class SampleConsumer {
    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    public SampleConsumer(String streamName, Region region) {
        this.streamName = streamName;
        this.region = region;
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    public void run() {
        DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        
        ConfigsBuilder configsBuilder = new ConfigsBuilder(
            streamName, 
            streamName, 
            kinesisClient, 
            dynamoDbAsyncClient,
            cloudWatchClient, 
            UUID.randomUUID().toString(), 
            new SampleRecordProcessorFactory()
        );

        Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }

    public static void main(String[] args) {
        String streamName = "your-stream-name"; // replace with your stream name
        Region region = Region.US_EAST_1; // replace with your region
        new SampleConsumer(streamName, region).run();
    }
}
```

 KCL menciptakan konsumen Enhanced Fan-out (EFO) dengan throughput khusus secara default. Untuk informasi selengkapnya tentang Enhanced Fan-out, lihat. [Kembangkan konsumen fan-out yang ditingkatkan dengan throughput khusus](enhanced-consumers.md) Jika Anda memiliki kurang dari 2 konsumen atau tidak memerlukan penundaan propagasi baca di bawah 200 ms, Anda harus mengatur konfigurasi berikut di objek scheduler untuk menggunakan konsumen throughput bersama:

```
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
```

Kode berikut adalah contoh pembuatan objek penjadwal yang menggunakan konsumen throughput bersama:

**Impor**:

```
import software.amazon.kinesis.retrieval.polling.PollingConfig;
```

**Kode**:

```
Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );/
```

# Mengembangkan konsumen dengan KCL dalam bahasa non-Jawa
<a name="develop-kcl-consumers-non-java"></a>

Bagian ini mencakup implementasi konsumen menggunakan Kinesis Client Library (KCL) di Python, Node.js, .NET, dan Ruby.

KCL adalah perpustakaan Java. Support untuk bahasa selain Java disediakan menggunakan antarmuka multi-bahasa yang disebut. `MultiLangDaemon` Daemon ini berbasis Java dan berjalan di latar belakang saat Anda menggunakan KCL dengan bahasa selain Java. Oleh karena itu, jika Anda menginstal KCL untuk bahasa non-Java dan menulis aplikasi konsumen Anda sepenuhnya dalam bahasa non-Java, Anda masih memerlukan Java diinstal pada sistem Anda karena itu. `MultiLangDaemon` Selanjutnya, `MultiLangDaemon` memiliki beberapa pengaturan default yang mungkin perlu Anda sesuaikan untuk kasus penggunaan Anda (misalnya, wilayah AWS yang terhubung dengannya). Untuk informasi selengkapnya tentang `MultiLangDaemon` on GitHub, lihat [ MultiLangDaemon proyek KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Sementara konsep inti tetap sama di seluruh bahasa, ada beberapa pertimbangan dan implementasi khusus bahasa. Untuk konsep inti tentang pengembangan konsumen KCL, lihat[Kembangkan konsumen dengan KCL di Jawa](develop-kcl-consumers-java.md). Untuk informasi lebih rinci tentang cara mengembangkan konsumen KCL dengan Python, Node.js, .NET, dan Ruby dan pembaruan terbaru, silakan merujuk ke repositori berikut: GitHub 
+ Python: [amazon-kinesis-client-python](https://github.com/awslabs/amazon-kinesis-client-python)
+ Node.js: [amazon-kinesis-client-nodejs](https://github.com/awslabs/amazon-kinesis-client-nodejs)
+ .NET: [amazon-kinesis-client-net](https://github.com/awslabs/amazon-kinesis-client-net)
+ Ruby: [amazon-kinesis-client-ruby](https://github.com/awslabs/amazon-kinesis-client-ruby)

**penting**  
Jangan gunakan versi pustaka KCL non-Java berikut jika Anda menggunakan JDK 8. Versi ini berisi dependensi (logback) yang tidak kompatibel dengan JDK 8.  
KCL Python 3.0.2 dan 2.2.0
KCL Node.js 2.3.0
KCL .NET 3.1.0
KCL Ruby 2.2.0
Sebaiknya gunakan versi yang dirilis sebelum atau sesudah versi yang terpengaruh ini saat bekerja dengan JDK 8.

# Pemrosesan multi-aliran dengan KCL
<a name="kcl-multi-stream"></a>

Bagian ini menjelaskan perubahan yang diperlukan dalam KCL yang memungkinkan Anda membuat aplikasi konsumen KCL yang dapat memproses lebih dari satu aliran data secara bersamaan.
**penting**  
Pemrosesan multi-aliran hanya didukung di KCL 2.3 atau yang lebih baru.
Pemrosesan multi-aliran *tidak* didukung untuk konsumen KCL yang ditulis dalam bahasa non-Java yang berjalan dengan. `multilangdaemon`
Pemrosesan multi-aliran *tidak* didukung dalam versi KCL 1.x apa pun.
+ **MultistreamTracker antarmuka**
  + Untuk membangun aplikasi konsumen yang dapat memproses beberapa aliran pada saat yang sama, Anda harus menerapkan antarmuka baru yang disebut [MultistreamTracker](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java). Antarmuka ini mencakup `streamConfigList` metode yang mengembalikan daftar aliran data dan konfigurasinya untuk diproses oleh aplikasi konsumen KCL. Perhatikan bahwa aliran data yang sedang diproses dapat diubah selama runtime aplikasi konsumen. `streamConfigList`disebut secara berkala oleh KCL untuk mempelajari tentang perubahan aliran data untuk diproses.
  + Yang `streamConfigList` mengisi [StreamConfig](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java#L23)daftar.

  ```
  package software.amazon.kinesis.common;
  
  import lombok.Data;
  import lombok.experimental.Accessors;
  
  @Data
  @Accessors(fluent = true)
  public class StreamConfig {
      private final StreamIdentifier streamIdentifier;
      private final InitialPositionInStreamExtended initialPositionInStreamExtended;
      private String consumerArn;
  }
  ```
  + `InitialPositionInStreamExtended`Bidang `StreamIdentifier` dan wajib, sementara `consumerArn` bersifat opsional. Anda harus menyediakan `consumerArn` satu-satunya jika Anda menggunakan KCL untuk menerapkan aplikasi konsumen fan-out yang disempurnakan.
  + Untuk informasi selengkapnya`StreamIdentifier`, lihat [https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java \$1L129](https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129). Untuk membuat`StreamIdentifier`, kami sarankan Anda membuat instance multistream dari `streamArn` dan yang tersedia di KCL 2.5.0 atau `streamCreationEpoch` yang lebih baru. Di KCL v2.3 dan v2.4, yang tidak mendukung`streamArm`, buat instance multistream dengan menggunakan format. `account-id:StreamName:streamCreationTimestamp` Format ini akan usang dan tidak lagi didukung dimulai dengan rilis utama berikutnya.
  +  MultistreamTracker juga mencakup strategi untuk menghapus sewa aliran lama di tabel sewa (). formerStreamsLeases DeletionStrategy Perhatikan bahwa strategi TIDAK DAPAT diubah selama runtime aplikasi konsumen. Untuk informasi lebih lanjut, lihat [https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0 b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java) .java.
+   [ConfigsBuilder](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java)adalah kelas seluruh aplikasi yang dapat Anda gunakan untuk menentukan semua pengaturan konfigurasi KCL yang akan digunakan saat membangun aplikasi konsumen KCL Anda untuk KCL versi 2.x atau yang lebih baru. `ConfigsBuilder`kelas sekarang memiliki dukungan untuk `MultistreamTracker` antarmuka. Anda dapat menginisialisasi ConfigsBuilder baik dengan nama satu aliran data untuk menggunakan catatan dari: 

  ```
  /**
       * Constructor to initialize ConfigsBuilder with StreamName
       * @param streamName
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.right(streamName);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```  

Atau Anda dapat menginisialisasi ConfigsBuilder dengan `MultiStreamTracker` jika Anda ingin mengimplementasikan aplikasi konsumen KCL yang memproses beberapa aliran secara bersamaan.

```
* Constructor to initialize ConfigsBuilder with MultiStreamTracker
     * @param multiStreamTracker
     * @param applicationName
     * @param kinesisClient
     * @param dynamoDBClient
     * @param cloudWatchClient
     * @param workerIdentifier
     * @param shardRecordProcessorFactory
     */
    public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
            @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
            @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
            @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
        this.appStreamTracker = Either.left(multiStreamTracker);
        this.applicationName = applicationName;
        this.kinesisClient = kinesisClient;
        this.dynamoDBClient = dynamoDBClient;
        this.cloudWatchClient = cloudWatchClient;
        this.workerIdentifier = workerIdentifier;
        this.shardRecordProcessorFactory = shardRecordProcessorFactory;
    }
```
+ Dengan dukungan multi-stream yang diterapkan untuk aplikasi konsumen KCL Anda, setiap baris tabel sewa aplikasi sekarang berisi ID pecahan dan nama aliran dari beberapa aliran data yang diproses aplikasi ini.
+ Ketika dukungan multi-stream untuk aplikasi konsumen KCL Anda diimplementasikan, LeaseKey mengambil struktur berikut:. `account-id:StreamName:streamCreationTimestamp:ShardId` Misalnya, `111111111:multiStreamTest-1:12345:shardId-000000000336`.

**penting**  
Ketika aplikasi konsumen KCL Anda yang ada dikonfigurasi untuk memproses hanya satu aliran data, `leaseKey` (yang merupakan kunci partisi untuk tabel sewa) adalah ID pecahan. Jika Anda mengkonfigurasi ulang aplikasi konsumen KCL yang ada untuk memproses beberapa aliran data, itu merusak tabel sewa Anda, karena `leaseKey` strukturnya harus sebagai berikut: `account-id:StreamName:StreamCreationTimestamp:ShardId` untuk mendukung multi-aliran.

# Gunakan registri AWS Glue Skema dengan KCL
<a name="kcl-glue-schema"></a>

Anda dapat mengintegrasikan Kinesis Data Streams AWS Glue dengan registri Schema. Registri AWS Glue Skema memungkinkan Anda menemukan, mengontrol, dan mengembangkan skema secara terpusat, sambil memastikan data yang dihasilkan terus divalidasi oleh skema terdaftar. Sebuah skema mendefinisikan struktur dan format catatan data. Sebuah skema adalah sebuah spesifikasi berversi untuk publikasi data yang handal, konsumsi, atau penyimpanan. Registri AWS Glue Skema memungkinkan Anda meningkatkan kualitas end-to-end data dan tata kelola data dalam aplikasi streaming Anda. Untuk informasi selengkapnya, lihat [Skema Registri AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Salah satu cara untuk mengatur integrasi ini adalah melalui KCL untuk Java.

**penting**  
AWS Glue Integrasi registri skema untuk Kinesis Data Streams hanya didukung di KCL 2.3 atau yang lebih baru.
AWS Glue *Integrasi registri skema untuk Kinesis Data Streams tidak didukung untuk konsumen KCL yang ditulis dalam bahasa non-Java yang dijalankan dengan.* `multilangdaemon`
AWS Glue *Integrasi registri skema untuk Kinesis Data Streams tidak didukung dalam versi KCL 1.x apa pun.*

Untuk petunjuk rinci tentang cara mengatur integrasi Kinesis Data Streamswith AWS Glue Schema registry menggunakan KCL, lihat bagian “Berinteraksi dengan Data Menggunakan KPL/KCL Pustaka” di Kasus Penggunaan: [Mengintegrasikan Amazon Kinesis Data](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds) Streams dengan Schema Registry. AWS Glue 

# Izin IAM diperlukan untuk aplikasi konsumen KCL
<a name="kcl-iam-permissions"></a>

 Anda harus menambahkan izin berikut ke peran IAM atau pengguna yang terkait dengan aplikasi konsumen KCL Anda. 

 Praktik terbaik keamanan untuk AWS mendikte penggunaan izin halus untuk mengontrol akses ke sumber daya yang berbeda. AWS Identity and Access Management (IAM) memungkinkan Anda mengelola pengguna dan izin pengguna di. AWS Kebijakan IAM secara eksplisit mencantumkan tindakan yang diizinkan dan sumber daya tempat tindakan tersebut berlaku.

Tabel berikut menunjukkan izin IAM minimum yang umumnya diperlukan untuk aplikasi konsumen KCL:


**Izin IAM minimum untuk aplikasi konsumen KCL**  

| Layanan | Tindakan | Sumber daya (ARNs) | Tujuan | 
| --- | --- | --- | --- | 
| Amazon Kinesis Data Streams |  `DescribeStream` `DescribeStreamSummary` `RegisterStreamConsumer`  |  Kinesis data stream dari mana aplikasi KCL Anda akan memproses data.`arn:aws:kinesis:region:account:stream/StreamName`  |  Sebelum mencoba membaca catatan, konsumen memeriksa apakah aliran data ada, apakah aktif, dan apakah pecahan terkandung dalam aliran data. Mendaftarkan konsumen ke pecahan.  | 
| Amazon Kinesis Data Streams |  `GetRecords` `GetShardIterator` `ListShards`  | Kinesis data stream dari mana aplikasi KCL Anda akan memproses data.`arn:aws:kinesis:region:account:stream/StreamName` |  Membaca catatan dari pecahan.  | 
| Amazon Kinesis Data Streams |  `SubscribeToShard` `DescribeStreamConsumer` |  Kinesis data stream dari mana aplikasi KCL Anda akan memproses data. Tambahkan tindakan ini hanya jika Anda menggunakan konsumen fan-out (EFO) yang disempurnakan. `arn:aws:kinesis:region:account:stream/StreamName/consumer/*`  |  Berlangganan pecahan untuk konsumen fan-out (EFO) yang disempurnakan.  | 
| Amazon DynamoDB |  `CreateTable` `DescribeTable` `UpdateTable` `Scan` `GetItem` `PutItem` `UpdateItem` `DeleteItem`  |  Tabel sewa (tabel metadata di DynamoDB dibuat oleh KCL. `arn:aws:dynamodb:region:account:table/KCLApplicationName`  |  Tindakan ini diperlukan agar KCL dapat menggunakan tabel sewa yang dibuat di DynamoDB.  | 
| Amazon DynamoDB |  `CreateTable` `DescribeTable` `Scan` `GetItem` `PutItem` `UpdateItem` `DeleteItem`  |  Metrik pekerja dan tabel status koordinator (tabel metadata di DynamoDB) dibuat oleh KCL. `arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStats` `arn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState`  |  Tindakan ini diperlukan untuk KCL untuk mengelola metrik pekerja dan tabel metadata status koordinator di DynamoDB.  | 
| Amazon DynamoDB | `Query` |  Indeks sekunder global pada tabel sewa. `arn:aws:dynamodb:region:account:table/KCLApplicationName/index/*`  |  Tindakan ini diperlukan agar KCL membaca indeks sekunder global dari tabel sewa yang dibuat di DynamoDB.  | 
| Amazon CloudWatch | `PutMetricData` |  \$1  |  Unggah metrik CloudWatch yang berguna untuk memantau aplikasi. Tanda bintang (\$1) digunakan karena tidak ada sumber daya spesifik CloudWatch di mana `PutMetricData` tindakan dipanggil.   | 

**catatan**  
Ganti “wilayah,” “akun,” “StreamName,” dan “KCLApplicationNama” ARNs dengan nama Anda sendiri Wilayah AWS, Akun AWS nomor, Kinesis data stream, dan nama aplikasi KCL masing-masing. KCL 3.x membuat dua tabel metadata lagi di DynamoDB. Untuk detail tentang tabel metadata DynamoDB yang dibuat oleh KCL, lihat. [Tabel metadata DynamoDB dan load balancing di KCL](kcl-dynamoDB.md) Jika Anda menggunakan konfigurasi untuk menyesuaikan nama tabel metadata yang dibuat oleh KCL, gunakan nama tabel yang ditentukan, bukan nama aplikasi KCL. 

Berikut ini adalah contoh dokumen kebijakan untuk aplikasi konsumen KCL. 

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer",
                "kinesis:GetRecords",
                "kinesis:GetShardIterator",
                "kinesis:ListShards"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/STREAM_NAME"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/STREAM_NAME/consumer/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:UpdateTable",
                "dynamodb:GetItem",
                "dynamodb:UpdateItem",
                "dynamodb:PutItem",
                "dynamodb:DeleteItem",
                "dynamodb:Scan"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:GetItem",
                "dynamodb:UpdateItem",
                "dynamodb:PutItem",
                "dynamodb:DeleteItem",
                "dynamodb:Scan"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME-WorkerMetricStats",
    "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME-CoordinatorState"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:Query"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME/index/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": "*"
        }
    ]
}
```

------

Sebelum Anda menggunakan kebijakan contoh ini, periksa item berikut:
+ Ganti REGION dengan Anda Wilayah AWS (misalnya, us-east-1).
+ Ganti ACCOUNT\$1ID dengan ID Anda. Akun AWS 
+ Ganti STREAM\$1NAME dengan nama aliran data Kinesis Anda.
+ Ganti CONSUMER\$1NAME dengan nama konsumen Anda, biasanya nama aplikasi Anda saat menggunakan KCL.
+ Ganti KCL\$1APPLICATION\$1NAME dengan nama aplikasi KCL Anda.

# Konfigurasi KCL
<a name="kcl-configuration"></a>

Anda dapat mengatur properti konfigurasi untuk menyesuaikan fungsionalitas Perpustakaan Klien Kinesis untuk memenuhi persyaratan spesifik Anda. Tabel berikut menjelaskan properti konfigurasi dan kelas.

**penting**  
Dalam KCL 3.x, algoritma load balancing bertujuan untuk mencapai pemanfaatan CPU yang merata di seluruh pekerja, bukan jumlah sewa yang sama per pekerja. Pengaturan `maxLeasesForWorker` terlalu rendah, Anda mungkin membatasi kemampuan KCL untuk menyeimbangkan beban kerja secara efektif. Jika Anda menggunakan `maxLeasesForWorker` konfigurasi, pertimbangkan untuk meningkatkan nilainya untuk memungkinkan distribusi beban terbaik.


**Tabel ini menunjukkan properti konfigurasi untuk KCL**  

| Properti konfigurasi | Kelas konfigurasi | Deskripsi | Nilai default | 
| --- | --- | --- | --- | 
| applicationName | ConfigsBuilder | Nama untuk ini aplikasi KCL. Digunakan sebagai default untuk tableName danconsumerName. | Tidak berlaku | 
| tableName | ConfigsBuilder |  Memungkinkan penggantian nama tabel yang digunakan untuk tabel sewa Amazon DynamoDB.  | Tidak berlaku | 
| streamName | ConfigsBuilder |  Nama aliran tempat aplikasi ini memproses catatan dari.  | Tidak berlaku | 
| workerIdentifier | ConfigsBuilder |  Pengenal unik yang mewakili instantiasi prosesor aplikasi ini. Ini pasti unik.  | Tidak berlaku | 
| failoverTimeMillis | LeaseManagementConfig |  Jumlah milidetik yang harus dilewati sebelum Anda dapat mempertimbangkan pemilik sewa telah gagal. Untuk aplikasi yang memiliki sejumlah besar pecahan, ini dapat diatur ke angka yang lebih tinggi untuk mengurangi jumlah DynamoDB IOPS yang diperlukan untuk melacak sewa.  | 10.000 (10 detik) | 
| shardSyncIntervalMillis | LeaseManagementConfig |  Waktu antara panggilan sinkronisasi shard.  | 60.000 (60 detik) | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig |  Ketika ditetapkan, sewa dihapus segera setelah sewa anak mulai diproses.  | BETUL | 
| ignoreUnexpectedChildShards | LeaseManagementConfig |  Saat diatur, pecahan anak yang memiliki pecahan terbuka diabaikan. Ini terutama untuk DynamoDB Streams.  | SALAH | 
| maxLeasesForWorker | LeaseManagementConfig |  Jumlah maksimum sewa yang harus diterima oleh satu pekerja. Mengaturnya terlalu rendah dapat menyebabkan kehilangan data jika pekerja tidak dapat memproses semua pecahan, dan menyebabkan penugasan sewa yang kurang optimal di antara pekerja. Pertimbangkan jumlah pecahan total, jumlah pekerja, dan kapasitas pemrosesan pekerja saat mengonfigurasinya.  | Tidak terbatas. | 
| maxLeaseRenewalThreads | LeaseManagementConfig |  Mengontrol ukuran kumpulan utas penyewa penyewaan. Semakin banyak sewa yang dapat diambil aplikasi Anda, semakin besar kumpulan ini seharusnya.  | 20 | 
| billingMode | LeaseManagementConfig |  Menentukan mode kapasitas tabel sewa yang dibuat di DynamoDB. Ada dua opsi: mode on-demand (PAY\$1PER\$1REQUEST) dan mode yang disediakan. Sebaiknya gunakan pengaturan default mode on-demand karena secara otomatis menskalakan untuk mengakomodasi beban kerja Anda tanpa perlu perencanaan kapasitas.  | PAY\$1PER\$1REQUEST (mode sesuai permintaan) | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | Kapasitas baca DynamoDB yang digunakan jika Perpustakaan Klien Kinesis perlu membuat tabel sewa DynamoDB baru dengan mode kapasitas yang disediakan. Anda dapat mengabaikan konfigurasi ini jika Anda menggunakan mode kapasitas sesuai permintaan default dalam billingMode konfigurasi. | 10 | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | Kapasitas baca DynamoDB yang digunakan jika Perpustakaan Klien Kinesis perlu membuat tabel sewa DynamoDB baru. Anda dapat mengabaikan konfigurasi ini jika Anda menggunakan mode kapasitas sesuai permintaan default dalam billingMode konfigurasi. | 10 | 
| initialPositionInStreamExtended | LeaseManagementConfig |  Posisi awal dalam aliran tempat aplikasi harus dimulai. Ini hanya digunakan selama pembuatan sewa awal.  |  InitialPositionInStream.TRIM\$1HORIZON  | 
| reBalanceThresholdPercentage | LeaseManagementConfig |  Nilai persentase yang menentukan kapan algoritma load balancing harus mempertimbangkan untuk menetapkan kembali pecahan di antara pekerja. Ini adalah konfigurasi baru yang diperkenalkan di KCL 3.x.  | 10 | 
| dampeningPercentage | LeaseManagementConfig |  Nilai persentase yang digunakan untuk meredam jumlah beban yang akan dipindahkan dari pekerja yang kelebihan beban dalam satu operasi penyeimbangan kembali. Ini adalah konfigurasi baru yang diperkenalkan di KCL 3.x.  | 60 | 
| allowThroughputOvershoot | LeaseManagementConfig |  Menentukan apakah sewa tambahan masih perlu diambil dari pekerja yang kelebihan beban bahkan jika itu menyebabkan jumlah total throughput sewa yang diambil melebihi jumlah throughput yang diinginkan. Ini adalah konfigurasi baru yang diperkenalkan di KCL 3.x.  | BETUL | 
| disableWorkerMetrics | LeaseManagementConfig |  Menentukan apakah KCL harus mengabaikan metrik sumber daya dari pekerja (seperti pemanfaatan CPU) saat menetapkan kembali sewa dan penyeimbangan beban. Setel ini ke TRUE jika Anda ingin mencegah KCL dari load balancing berdasarkan pemanfaatan CPU. Ini adalah konfigurasi baru yang diperkenalkan di KCL 3.x.  | SALAH | 
| maxThroughputPerHostKBps | LeaseManagementConfig |  Jumlah throughput maksimum untuk ditugaskan kepada pekerja selama penugasan sewa. Ini adalah konfigurasi baru yang diperkenalkan di KCL 3.x.  | Tidak terbatas. | 
| isGracefulLeaseHandoffEnabled | LeaseManagementConfig |  Mengontrol perilaku serah terima sewa antar pekerja. Ketika disetel ke true, KCL akan mencoba untuk mentransfer sewa dengan anggun dengan memberikan waktu pecahan yang RecordProcessor cukup untuk menyelesaikan pemrosesan sebelum menyerahkan sewa kepada pekerja lain. Ini dapat membantu memastikan integritas data dan transisi yang lancar tetapi dapat meningkatkan waktu handoff. Ketika disetel ke false, sewa akan segera diserahkan tanpa menunggu RecordProcessor untuk ditutup dengan anggun. Hal ini dapat menyebabkan handoff lebih cepat tetapi dapat berisiko pemrosesan yang tidak lengkap. Catatan: Checkpointing harus diimplementasikan di dalam metode ShutdownRequested () RecordProcessor untuk mendapatkan manfaat dari fitur handoff sewa yang anggun. Ini adalah konfigurasi baru yang diperkenalkan di KCL 3.x.  | BETUL | 
| gracefulLeaseHandoffTimeoutMillis | LeaseManagementConfig |  Menentukan waktu minimum (dalam milidetik) untuk menunggu pecahan saat ini ditutup dengan anggun sebelum secara paksa mentransfer sewa ke pemilik berikutnya. RecordProcessor  Jika metode ProcessRecords Anda biasanya berjalan lebih lama dari nilai default, pertimbangkan untuk meningkatkan setelan ini. Ini memastikan RecordProcessor memiliki waktu yang cukup untuk menyelesaikan pemrosesannya sebelum transfer sewa terjadi. Ini adalah konfigurasi baru yang diperkenalkan di KCL 3.x.  | 30.000 (30 detik) | 
| maxRecords | PollingConfig |  Memungkinkan pengaturan jumlah maksimum catatan yang Kinesis kembalikan.  | 10.000 | 
| retryGetRecordsInSeconds | PollingConfig |  Mengkonfigurasi penundaan antara GetRecords upaya untuk kegagalan.  | Tidak ada | 
| maxGetRecordsThreadPool | PollingConfig |  Ukuran kolam benang yang digunakan untuk GetRecords.  | Tidak ada | 
| idleTimeBetweenReadsInMillis | PollingConfig |  Menentukan berapa lama KCL menunggu di antara GetRecords panggilan untuk polling data dari aliran data. Satuannya adalah milidetik.  | 1.500 | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig |  Ketika ditetapkan, prosesor rekaman dipanggil bahkan ketika tidak ada catatan yang diberikan dari Kinesis.  | SALAH | 
| parentShardPollIntervalMillis | CoordinatorConfig |  Seberapa sering prosesor rekaman harus melakukan polling untuk melihat apakah pecahan induk telah selesai. Satuannya adalah milidetik.  | 10.000 (10 detik) | 
| skipShardSyncAtWorkerInitializationIfLeaseExist | CoordinatorConfig |  Nonaktifkan sinkronisasi data pecahan jika tabel sewa berisi sewa yang ada.  |  SALAH  | 
| shardPrioritization | CoordinatorConfig |  Prioritas pecahan mana yang akan digunakan.  |  NoOpShardPrioritization  | 
| ClientVersionConfig | CoordinatorConfig |  Menentukan mode kompatibilitas versi KCL mana aplikasi akan berjalan. Konfigurasi ini hanya untuk migrasi dari versi KCL sebelumnya. Saat bermigrasi ke 3.x, Anda perlu mengatur konfigurasi ini ke. `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` Anda dapat menghapus konfigurasi ini saat Anda menyelesaikan migrasi.  | CLIENT\$1VERSION\$1CONFIG\$13X | 
| taskBackoffTimeMillis | LifecycleConfig |  Waktu menunggu untuk mencoba lagi tugas KCL gagal. Satuannya adalah milidetik.  | 500 (0,5 detik) | 
| logWarningForTaskAfterMillis | LifecycleConfig |  Berapa lama menunggu sebelum peringatan dicatat jika tugas belum selesai.  | Tidak ada | 
| listShardsBackoffTimeInMillis | RetrievalConfig | Jumlah milidetik untuk menunggu di antara panggilan ke ListShards saat kegagalan terjadi. Satuannya adalah milidetik. | 1.500 (1,5 detik) | 
| maxListShardsRetryAttempts | RetrievalConfig | Jumlah maksimum kali yang ListShards mencoba lagi sebelum menyerah. | 50 | 
| metricsBufferTimeMillis | MetricsConfig |  Menentukan durasi maksimum (dalam milidetik) untuk menyangga metrik sebelum mempublikasikannya. CloudWatch  | 10.000 (10 detik) | 
| metricsMaxQueueSize | MetricsConfig |  Menentukan jumlah maksimum metrik untuk buffer sebelum dipublikasikan ke. CloudWatch  | 10.000 | 
| metricsLevel | MetricsConfig |  Menentukan tingkat granularitas CloudWatch metrik yang akan diaktifkan dan dipublikasikan.  Nilai yang mungkin: TIDAK ADA, RINGKASAN, DETAIL.  |  MetricsLevel.RINCI  | 
| metricsEnabledDimensions | MetricsConfig |  Kontrol memungkinkan dimensi untuk CloudWatch Metrik.  | Semua dimensi | 

**Konfigurasi yang dihentikan di KCL 3.x**

Properti konfigurasi berikut dihentikan di KCL 3.x:


**Tabel menunjukkan properti konfigurasi yang dihentikan untuk KCL 3.x**  

| Properti konfigurasi | Kelas konfigurasi | Deskripsi | 
| --- | --- | --- | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig |  Jumlah maksimum sewa aplikasi harus mencoba untuk mencuri pada satu waktu. KCL 3.x akan mengabaikan konfigurasi ini dan menetapkan kembali sewa berdasarkan pemanfaatan sumber daya pekerja.  | 
| enablePriorityLeaseAssignment | LeaseManagementConfig |  Mengontrol apakah pekerja harus memprioritaskan pengambilan sewa yang sangat kedaluwarsa (sewa tidak diperpanjang untuk 3x waktu failover) dan sewa pecahan baru, terlepas dari jumlah sewa target tetapi masih menghormati batas sewa maksimum. KCL 3.x akan mengabaikan konfigurasi ini dan selalu menyebarkan sewa kedaluwarsa ke seluruh pekerja.  | 

**penting**  
Anda masih harus memiliki properti konfigurasi yang tidak sesuai selama migrasi dari versi KCL sebelumnya ke KCL 3.x. Selama migrasi, pekerja KCL pertama-tama akan memulai dengan mode kompatibel KCL 2.x dan beralih ke mode fungsionalitas KCL 3.x ketika mendeteksi bahwa semua pekerja KCL aplikasi siap menjalankan KCL 3.x. Konfigurasi yang dihentikan ini diperlukan saat pekerja KCL menjalankan mode yang kompatibel dengan KCL 2.x.

# Kebijakan siklus hidup versi KCL
<a name="kcl-version-lifecycle-policy"></a>

Topik ini menguraikan kebijakan siklus hidup versi untuk Amazon Kinesis Client Library (KCL). AWS secara teratur menyediakan rilis baru untuk versi KCL untuk mendukung fitur dan penyempurnaan baru, perbaikan bug, patch keamanan, dan pembaruan ketergantungan. Kami menyarankan Anda tetap up-to-date menggunakan versi KCL untuk mengikuti fitur-fitur terbaru, pembaruan keamanan, dan dependensi yang mendasarinya. Kami **tidak** menyarankan untuk terus menggunakan versi KCL yang tidak didukung.

Siklus hidup untuk versi KCL utama terdiri dari tiga fase berikut:
+ **Ketersediaan umum (GA)** — Selama fase ini, versi utama didukung sepenuhnya. AWS menyediakan rilis versi minor dan patch reguler yang mencakup dukungan untuk fitur baru atau pembaruan API untuk Kinesis Data Streams, serta perbaikan bug dan keamanan.
+ **Mode pemeliharaan** — AWS membatasi rilis versi patch untuk mengatasi perbaikan bug kritis dan masalah keamanan saja. Versi utama tidak akan menerima pembaruan untuk fitur baru atau APIs Kinesis Data Streams.
+ **E nd-of-support** — Versi utama tidak akan lagi menerima pembaruan atau rilis. Rilis yang diterbitkan sebelumnya akan terus tersedia melalui manajer paket publik dan kode akan tetap aktif GitHub. Penggunaan versi yang telah end-of-support dicapai dilakukan atas kebijaksanaan pengguna. Kami menyarankan Anda meningkatkan ke versi utama terbaru.


| Versi utama | Fase saat ini | Tanggal rilis | Tanggal mode pemeliharaan | End-of-support tanggal | 
| --- | --- | --- | --- | --- | 
| KCL 1.x | Mode pemeliharaan | 2013-12-19 | 2025-04-17 | 2026-01-30 | 
| KCL 2.x | Ketersediaan umum | 2018-08-02 | -- | -- | 
| KCL 3.x | Ketersediaan umum | 2024-11-06 | -- | -- | 

# Migrasi dari versi KCL sebelumnya
<a name="kcl-migration-previous-versions"></a>

Topik ini menjelaskan cara bermigrasi dari versi Kinesis Client Library (KCL) sebelumnya. 

## Apa yang baru KCL 3.0?
<a name="kcl-migration-new-3-0"></a>

Kinesis Client Library (KCL) 3.0 memperkenalkan beberapa perangkat tambahan utama dibandingkan dengan versi sebelumnya:
+  Ini menurunkan biaya komputasi untuk aplikasi konsumen dengan secara otomatis mendistribusikan kembali pekerjaan dari pekerja yang terlalu dimanfaatkan ke pekerja yang kurang dimanfaatkan dalam armada aplikasi konsumen. Algoritma load balancing baru ini memastikan pemanfaatan CPU yang didistribusikan secara merata di seluruh pekerja dan menghilangkan kebutuhan untuk menyediakan pekerja yang berlebihan.
+  Ini mengurangi biaya DynamoDB yang terkait dengan KCL dengan mengoptimalkan operasi baca pada tabel sewa.
+ Ini meminimalkan pemrosesan ulang data ketika sewa dipindahkan ke pekerja lain dengan memungkinkan pekerja saat ini menyelesaikan pemeriksaan catatan yang telah diproses.
+  Ini digunakan AWS SDK for Java 2.x untuk meningkatkan kinerja dan fitur keamanan, sepenuhnya menghapus ketergantungan pada AWS SDK untuk Java 1.x.

Untuk informasi selengkapnya, lihat [catatan rilis KCL 3.0](https://github.com/awslabs/amazon-kinesis-client/blob/master/CHANGELOG.md).

**Topics**
+ [Apa yang baru KCL 3.0?](#kcl-migration-new-3-0)
+ [Migrasi dari KCL 2.x ke KCL 3.x](kcl-migration-from-2-3.md)
+ [Putar kembali ke versi KCL sebelumnya](kcl-migration-rollback.md)
+ [Gulung maju ke KCL 3.x setelah rollback](kcl-migration-rollforward.md)
+ [Praktik terbaik untuk tabel sewa dengan mode kapasitas yang disediakan](kcl-migration-lease-table.md)
+ [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

# Migrasi dari KCL 2.x ke KCL 3.x
<a name="kcl-migration-from-2-3"></a>

Topik ini memberikan step-by-step petunjuk untuk memigrasikan konsumen Anda dari KCL 2.x ke KCL 3.x. KCL 3.x mendukung migrasi di tempat konsumen KCL 2.x. Anda dapat terus mengkonsumsi data dari aliran data Kinesis Anda sambil memigrasikan pekerja Anda secara bergulir.

**penting**  
KCL 3.x mempertahankan antarmuka dan metode yang sama seperti KCL 2.x. Oleh karena itu, Anda tidak perlu memperbarui kode pemrosesan catatan selama migrasi. Namun, Anda harus mengatur konfigurasi yang tepat dan memeriksa langkah-langkah yang diperlukan untuk migrasi. Kami sangat menyarankan Anda mengikuti langkah-langkah migrasi berikut untuk pengalaman migrasi yang lancar.

## Langkah 1: Prasyarat
<a name="kcl-migration-from-2-3-prerequisites"></a>

Sebelum Anda mulai menggunakan KCL 3.x, pastikan Anda memiliki yang berikut:
+ Java Development Kit (JDK) 8 atau lebih baru
+ AWS SDK untuk Java 2.x
+ Maven atau Gradle untuk manajemen ketergantungan

**penting**  
Jangan gunakan AWS SDK untuk Java versi 2.27.19 hingga 2.27.23 dengan KCL 3.x. Versi ini menyertakan masalah yang menyebabkan kesalahan pengecualian terkait dengan penggunaan DynamoDB KCL. Kami menyarankan Anda menggunakan AWS SDK untuk Java versi 2.28.0 atau yang lebih baru untuk menghindari masalah ini. 

## Langkah 2: Tambahkan dependensi
<a name="kcl-migration-from-2-3-dependencies"></a>

Jika Anda menggunakan Maven, tambahkan dependensi berikut ke file Anda. `pom.xml` Pastikan Anda mengganti 3.xx ke versi KCL terbaru. 

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

Jika Anda menggunakan Gradle, tambahkan berikut ini ke `build.gradle` file Anda. Pastikan Anda mengganti 3.xx ke versi KCL terbaru. 

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

Anda dapat memeriksa versi terbaru KCL di Repositori Pusat [Maven](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client).

## Langkah 3: Siapkan konfigurasi terkait migrasi
<a name="kcl-migration-from-2-3-configuration"></a>

Untuk bermigrasi dari KCL 2.x ke KCL 3.x, Anda harus mengatur parameter konfigurasi berikut:
+ CoordinatorConfig. clientVersionConfig: Konfigurasi ini menentukan mode kompatibilitas versi KCL mana aplikasi akan berjalan. Saat bermigrasi dari KCL 2.x ke 3.x, Anda perlu mengatur konfigurasi ini ke. `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` Untuk mengatur konfigurasi ini, tambahkan baris berikut saat membuat objek scheduler Anda:

```
configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X)
```

Berikut ini adalah contoh cara mengatur `CoordinatorConfig.clientVersionConfig` untuk bermigrasi dari KCL 2.x ke 3.x. Anda dapat menyesuaikan konfigurasi lain sesuai kebutuhan berdasarkan kebutuhan spesifik Anda:

```
Scheduler scheduler = new Scheduler(
    configsBuilder.checkpointConfig(),
    configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X),
    configsBuilder.leaseManagementConfig(),
    configsBuilder.lifecycleConfig(),
    configsBuilder.metricsConfig(),
    configsBuilder.processorConfig(),
    configsBuilder.retrievalConfig()
);
```

Sangat penting bahwa semua pekerja dalam aplikasi konsumen Anda menggunakan algoritma load balancing yang sama pada waktu tertentu karena KCL 2.x dan 3.x menggunakan algoritma load balancing yang berbeda. Menjalankan pekerja dengan algoritma load balancing yang berbeda dapat menyebabkan distribusi beban suboptimal karena kedua algoritma beroperasi secara independen.

Pengaturan kompatibilitas KCL 2.x ini memungkinkan aplikasi KCL 3.x Anda berjalan dalam mode yang kompatibel dengan KCL 2.x dan menggunakan algoritma load balancing untuk KCL 2.x hingga semua pekerja di aplikasi konsumen Anda telah ditingkatkan ke KCL 3.x. Ketika migrasi selesai, KCL akan secara otomatis beralih ke mode fungsionalitas KCL 3.x penuh dan mulai menggunakan algoritma penyeimbangan beban KCL 3.x baru untuk semua pekerja yang sedang berjalan.

**penting**  
Jika Anda tidak menggunakan `ConfigsBuilder` tetapi membuat `LeaseManagementConfig` objek untuk mengatur konfigurasi, Anda harus menambahkan satu parameter lagi yang disebut `applicationName` dalam KCL versi 3.x atau yang lebih baru. Untuk detailnya, lihat [Kesalahan kompilasi dengan LeaseManagementConfig konstruktor](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#compiliation-error-leasemanagementconfig). Kami merekomendasikan penggunaan `ConfigsBuilder` untuk mengatur konfigurasi KCL. `ConfigsBuilder`menyediakan cara yang lebih fleksibel dan dapat dipelihara untuk mengkonfigurasi aplikasi KCL Anda.

## Langkah 4: Ikuti praktik terbaik untuk implementasi metode ShutdownRequested ()
<a name="kcl-migration-from-2-3-best-practice"></a>

KCL 3.x memperkenalkan fitur yang disebut *handoff sewa anggun* untuk meminimalkan pemrosesan ulang data ketika sewa diserahkan kepada pekerja lain sebagai bagian dari proses penggantian sewa. Ini dicapai dengan memeriksa nomor urut yang diproses terakhir di tabel sewa sebelum serah terima sewa. Untuk memastikan handoff sewa anggun berfungsi dengan baik, Anda harus memastikan bahwa Anda memanggil `checkpointer` objek dalam metode di kelas Anda. `shutdownRequested` `RecordProcessor` Jika Anda tidak memanggil `checkpointer` objek dalam `shutdownRequested` metode, Anda dapat menerapkannya seperti yang diilustrasikan dalam contoh berikut. 

**penting**  
Contoh implementasi berikut adalah persyaratan minimal untuk handoff sewa yang anggun. Anda dapat memperluasnya untuk menyertakan logika tambahan yang terkait dengan pos pemeriksaan jika diperlukan. Jika Anda melakukan pemrosesan asinkron, pastikan semua catatan yang dikirimkan ke hilir diproses sebelum menjalankan checkpointing. 
Sementara handoff sewa yang anggun secara signifikan mengurangi kemungkinan pemrosesan ulang data selama transfer sewa, itu tidak sepenuhnya menghilangkan kemungkinan ini. Untuk menjaga integritas dan konsistensi data, rancang aplikasi konsumen hilir Anda menjadi idempoten. Ini berarti mereka harus dapat menangani pemrosesan rekaman duplikat potensial tanpa efek buruk pada keseluruhan sistem.

```
/**
 * Invoked when either Scheduler has been requested to gracefully shutdown
 * or lease ownership is being transferred gracefully so the current owner
 * gets one last chance to checkpoint.
 *
 * Checkpoints and logs the data a final time.
 *
 * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
 *                               before the shutdown is completed.
 */
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
    try {
       // Ensure that all delivered records are processed 
       // and has been successfully flushed to the downstream before calling 
       // checkpoint
       // If you are performing any asynchronous processing or flushing to
       // downstream, you must wait for its completion before invoking
       // the below checkpoint method.
        log.info("Scheduler is shutting down, checkpointing.");
        shutdownRequestedInput.checkpointer().checkpoint();
    } catch (ShutdownException | InvalidStateException e) {
        log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
    } 
}
```

## Langkah 5: Periksa prasyarat KCL 3.x untuk mengumpulkan metrik pekerja
<a name="kcl-migration-from-2-3-worker-metrics"></a>

KCL 3.x mengumpulkan metrik pemanfaatan CPU seperti pemanfaatan CPU dari pekerja untuk menyeimbangkan beban di seluruh pekerja secara merata. Pekerja aplikasi konsumen dapat berjalan di Amazon EC2, Amazon ECS, Amazon EKS, atau. AWS Fargate KCL 3.x dapat mengumpulkan metrik pemanfaatan CPU dari pekerja hanya jika prasyarat berikut terpenuhi:

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ Sistem operasi Anda harus OS Linux.
+ Anda harus mengaktifkan [IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html)di instans EC2 Anda.

 **Amazon Elastic Container Service (Amazon ECS) di Amazon EC2**
+ Sistem operasi Anda harus OS Linux.
+ Anda harus mengaktifkan titik [akhir metadata tugas ECS](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html) versi 4. 
+ Versi agen penampung Amazon ECS Anda harus 1.39.0 atau yang lebih baru.

 **Amazon ECS aktif AWS Fargate**
+ Anda harus mengaktifkan titik [akhir metadata tugas Fargate](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html) versi 4. Jika Anda menggunakan platform Fargate versi 1.4.0 atau yang lebih baru, ini diaktifkan secara default. 
+ Platform Fargate versi 1.4.0 atau yang lebih baru.

 **Layanan Amazon Elastic Kubernetes (Amazon EKS) di Amazon EC2** 
+ Sistem operasi Anda harus OS Linux.

 **Amazon EKS di AWS Fargate**
+ Platform Fargate 1.3.0 atau yang lebih baru.

**penting**  
Jika KCL 3.x tidak dapat mengumpulkan metrik pemanfaatan CPU dari pekerja karena prasyarat tidak terpenuhi, itu akan menyeimbangkan kembali beban tingkat throughput per sewa. Mekanisme penyeimbangan kembali ini akan memastikan semua pekerja akan mendapatkan tingkat throughput total yang sama dari sewa yang diberikan kepada setiap pekerja. Untuk informasi selengkapnya, lihat [Bagaimana KCL memberikan sewa kepada pekerja dan menyeimbangkan beban](kcl-dynamoDB.md#kcl-assign-leases).

## Langkah 6: Perbarui izin IAM untuk KCL 3.x
<a name="kcl-migration-from-2-3-IAM-permissions"></a>

Anda harus menambahkan izin berikut ke peran atau kebijakan IAM yang terkait dengan aplikasi konsumen KCL 3.x Anda. Ini melibatkan memperbarui kebijakan IAM yang ada yang digunakan oleh aplikasi KCL. Untuk informasi selengkapnya, lihat [Izin IAM diperlukan untuk aplikasi konsumen KCL](kcl-iam-permissions.md).

**penting**  
Aplikasi KCL Anda yang ada mungkin tidak memiliki tindakan dan sumber daya IAM berikut yang ditambahkan dalam kebijakan IAM karena tidak diperlukan di KCL 2.x. Pastikan Anda telah menambahkannya sebelum menjalankan aplikasi KCL 3.x Anda:  
Tindakan: `UpdateTable`  
Sumber daya (ARNs): `arn:aws:dynamodb:region:account:table/KCLApplicationName`
Tindakan: `Query`  
Sumber daya (ARNs): `arn:aws:dynamodb:region:account:table/KCLApplicationName/index/*`
Tindakan:`CreateTable`,`DescribeTable`,`Scan`,`GetItem`,`PutItem`,`UpdateItem`, `DeleteItem`  
Sumber daya (ARNs):`arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStats`, `arn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState`
Ganti “wilayah,” “akun,” dan “KCLApplicationNama” di ARNs dengan nama aplikasi Anda sendiri Wilayah AWS, Akun AWS nomor, dan KCL masing-masing. Jika Anda menggunakan konfigurasi untuk menyesuaikan nama tabel metadata yang dibuat oleh KCL, gunakan nama tabel yang ditentukan, bukan nama aplikasi KCL.

## Langkah 7: Menyebarkan kode KCL 3.x ke pekerja Anda
<a name="kcl-migration-from-2-3-IAM-deploy"></a>

Setelah Anda mengatur konfigurasi yang diperlukan untuk migrasi dan menyelesaikan semua daftar periksa migrasi sebelumnya, Anda dapat membuat dan menerapkan kode Anda ke pekerja Anda.

**catatan**  
Jika Anda melihat kesalahan kompilasi dengan `LeaseManagementConfig` konstruktor, lihat [Kesalahan kompilasi dengan LeaseManagementConfig konstruktor untuk informasi](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#compilation-error-leasemanagementconfig) pemecahan masalah.

## Langkah 8: Selesaikan migrasi
<a name="kcl-migration-from-2-3-finish"></a>

Selama penyebaran kode KCL 3.x, KCL terus menggunakan algoritma penetapan sewa dari KCL 2.x. Ketika Anda telah berhasil menerapkan kode KCL 3.x ke semua pekerja Anda, KCL secara otomatis mendeteksi ini dan beralih ke algoritme penetapan sewa baru berdasarkan pemanfaatan sumber daya pekerja. Untuk detail selengkapnya tentang algoritme penetapan sewa baru, lihat. [Bagaimana KCL memberikan sewa kepada pekerja dan menyeimbangkan beban](kcl-dynamoDB.md#kcl-assign-leases)

Selama penerapan, Anda dapat memantau proses migrasi dengan metrik berikut yang dipancarkan. CloudWatch Anda dapat memantau metrik di bawah `Migration` operasi. Semua metrik adalah per-KCL-application metrik dan disetel ke tingkat `SUMMARY` metrik. Jika `Sum` statistik `CurrentState:3xWorker` metrik cocok dengan jumlah total pekerja dalam aplikasi KCL Anda, ini menunjukkan bahwa migrasi ke KCL 3.x telah berhasil diselesaikan.

**penting**  
 Dibutuhkan setidaknya 10 menit bagi KCL untuk beralih ke algoritme penugasan leasee baru setelah semua pekerja siap menjalankannya.


**CloudWatch metrik untuk proses migrasi KCL**  

| Metrik-metrik | Deskripsi | 
| --- | --- | 
| CurrentState:3xWorker |  Jumlah pekerja KCL berhasil bermigrasi ke KCL 3.x dan menjalankan algoritma penugasan sewa baru. Jika `Sum` hitungan metrik ini cocok dengan jumlah total pekerja Anda, ini menunjukkan bahwa migrasi ke KCL 3.x telah berhasil diselesaikan. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| CurrentState:2xCompatibleWorker |  Jumlah pekerja KCL yang berjalan dalam mode kompatibel KCL 2.x selama proses migrasi. Nilai bukan nol untuk metrik ini menunjukkan bahwa migrasi masih berlangsung. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| Fault |  Jumlah pengecualian yang ditemui selama proses migrasi. Sebagian besar pengecualian ini adalah kesalahan sementara, dan KCL 3.x akan secara otomatis mencoba lagi untuk menyelesaikan migrasi. Jika Anda mengamati nilai `Fault` metrik persisten, tinjau log Anda dari periode migrasi untuk pemecahan masalah lebih lanjut. Jika masalah berlanjut, hubungi Dukungan. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| GsiStatusReady |  Status penciptaan indeks sekunder global (GSI) pada tabel sewa. Metrik ini menunjukkan apakah GSI pada tabel sewa telah dibuat, prasyarat untuk menjalankan KCL 3.x. Nilainya adalah 0 atau 1, dengan 1 menunjukkan penciptaan yang berhasil. Selama keadaan rollback, metrik ini tidak akan dipancarkan. Setelah Anda maju lagi, Anda dapat melanjutkan pemantauan metrik ini. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| workerMetricsReady |  Status emisi metrik pekerja dari semua pekerja. Metrik menunjukkan apakah semua pekerja memancarkan metrik seperti pemanfaatan CPU. Nilainya adalah 0 atau 1, dengan 1 menunjukkan semua pekerja berhasil memancarkan metrik dan siap untuk algoritme penetapan sewa baru. Selama keadaan rollback, metrik ini tidak akan dipancarkan. Setelah Anda maju lagi, Anda dapat melanjutkan pemantauan metrik ini. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/streams/latest/dev/kcl-migration-from-2-3.html)  | 

KCL menyediakan kemampuan rollback ke mode kompatibel 2.x selama migrasi. Setelah migrasi berhasil ke KCL 3.x berhasil, kami sarankan Anda menghapus `CoordinatorConfig.clientVersionConfig` pengaturan `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` jika rollback tidak lagi diperlukan. Menghapus konfigurasi ini menghentikan emisi metrik terkait migrasi dari aplikasi KCL.

**catatan**  
Sebaiknya Anda memantau kinerja dan stabilitas aplikasi selama suatu periode selama migrasi dan setelah menyelesaikan migrasi. [Jika Anda mengamati masalah apa pun, Anda dapat mengembalikan pekerja untuk menggunakan fungsionalitas yang kompatibel dengan KCL 2.x menggunakan Alat Migrasi KCL.](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)

# Putar kembali ke versi KCL sebelumnya
<a name="kcl-migration-rollback"></a>

Topik ini menjelaskan langkah-langkah untuk mengembalikan konsumen Anda kembali ke versi sebelumnya. Ketika Anda perlu memutar kembali, ada proses dua langkah: 

1. Jalankan [Alat Migrasi KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

1. Menerapkan ulang kode versi KCL sebelumnya (opsional).

## Langkah 1: Jalankan Alat Migrasi KCL
<a name="kcl-migration-rollback-tool"></a>

Ketika Anda perlu memutar kembali ke versi KCL sebelumnya, Anda harus menjalankan Alat Migrasi KCL. Alat Migrasi KCL melakukan dua tugas penting:
+ Ini menghapus tabel metadata yang disebut tabel metrik pekerja dan indeks sekunder global pada tabel sewa di DynamoDB. Kedua artefak ini dibuat oleh KCL 3.x tetapi tidak diperlukan saat Anda memutar kembali ke versi sebelumnya.
+ Itu membuat semua pekerja berjalan dalam mode yang kompatibel dengan KCL 2.x dan mulai menggunakan algoritma load balancing yang digunakan dalam versi KCL sebelumnya. Jika Anda memiliki masalah dengan algoritme penyeimbangan beban baru di KCL 3.x, ini akan segera mengurangi masalah.

**penting**  
Tabel status koordinator di DynamoDB harus ada dan tidak boleh dihapus selama proses migrasi, rollback, dan rollforward. 

**catatan**  
Sangat penting bahwa semua pekerja dalam aplikasi konsumen Anda menggunakan algoritma load balancing yang sama pada waktu tertentu. Alat Migrasi KCL memastikan bahwa semua pekerja di aplikasi konsumen KCL 3.x Anda beralih ke mode yang kompatibel dengan KCL 2.x sehingga semua pekerja menjalankan algoritme penyeimbangan beban yang sama selama pelunasan bergulir kembali ke versi KCL Anda sebelumnya.

Anda dapat mengunduh [Alat Migrasi KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) di direktori skrip repositori [ GitHubKCL](https://github.com/awslabs/amazon-kinesis-client/tree/master). Skrip dapat dijalankan dari pekerja Anda atau host mana pun yang memiliki izin yang diperlukan untuk menulis ke tabel status koordinator, menghapus tabel metrik pekerja, dan memperbarui tabel sewa. Anda dapat merujuk ke izin IAM yang diperlukan [Izin IAM diperlukan untuk aplikasi konsumen KCL](kcl-iam-permissions.md) untuk menjalankan skrip. Anda harus menjalankan skrip hanya sekali per aplikasi KCL. Anda dapat menjalankan KCL Migration Tool dengan perintah berikut: 

```
python3 ./KclMigrationTool.py --region <region> --mode rollback [--application_name <applicationName>] [--lease_table_name <leaseTableName>] [--coordinator_state_table_name <coordinatorStateTableName>] [--worker_metrics_table_name <workerMetricsTableName>]
```

**Parameter**
+ --region: Ganti `<region>` dengan Anda Wilayah AWS.
+ --application\$1name: Parameter ini diperlukan jika Anda menggunakan nama default untuk tabel metadata DynamoDB Anda (tabel sewa, tabel status koordinator, dan tabel metrik pekerja). Jika Anda telah menentukan nama kustom untuk tabel ini, Anda dapat menghilangkan parameter ini. Ganti `<applicationName>` dengan nama aplikasi KCL Anda yang sebenarnya. Alat ini menggunakan nama ini untuk mendapatkan nama tabel default jika nama kustom tidak disediakan.
+ --lease\$1table\$1name (opsional): Parameter ini diperlukan ketika Anda telah menetapkan nama khusus untuk tabel sewa dalam konfigurasi KCL Anda. Jika Anda menggunakan nama tabel default, Anda dapat menghilangkan parameter ini. Ganti `leaseTableName` dengan nama tabel kustom yang Anda tentukan untuk tabel sewa Anda.
+ --coordinator\$1state\$1table\$1name (opsional): Parameter ini diperlukan ketika Anda telah menetapkan nama kustom untuk tabel status koordinator dalam konfigurasi KCL Anda. Jika Anda menggunakan nama tabel default, Anda dapat menghilangkan parameter ini. Ganti `<coordinatorStateTableName>` dengan nama tabel kustom yang Anda tentukan untuk tabel status koordinator Anda. 
+ --worker\$1metrics\$1table\$1name (opsional): Parameter ini diperlukan ketika Anda telah menetapkan nama khusus untuk tabel metrik pekerja dalam konfigurasi KCL Anda. Jika Anda menggunakan nama tabel default, Anda dapat menghilangkan parameter ini. Ganti `<workerMetricsTableName>` dengan nama tabel kustom yang Anda tentukan untuk tabel metrik pekerja Anda. 

## Langkah 2: Menerapkan ulang kode dengan versi KCL sebelumnya (opsional)
<a name="kcl-migration-rollback-redeploy"></a>

 Setelah menjalankan KCL Migration Tool untuk rollback, Anda akan melihat salah satu pesan berikut:
+ **Pesan 1:** “Rollback selesai. Aplikasi KCL Anda menjalankan mode yang kompatibel dengan KCL 2.x. Jika Anda tidak melihat mitigasi regresi apa pun, harap kembalikan ke binari aplikasi Anda sebelumnya dengan menerapkan kode dengan versi KCL Anda sebelumnya.
  + **Tindakan yang diperlukan:** Ini berarti pekerja Anda berjalan dalam mode yang kompatibel dengan KCL 2.x. Jika masalah berlanjut, terapkan ulang kode dengan versi KCL sebelumnya ke pekerja Anda.
+ **Pesan 2:** “Rollback selesai. Aplikasi KCL Anda menjalankan mode fungsionalitas KCL 3.x. Rollback ke binari aplikasi sebelumnya tidak diperlukan, kecuali jika Anda tidak melihat mitigasi untuk masalah ini dalam waktu 5 menit. Jika Anda masih memiliki masalah, harap kembalikan ke binari aplikasi Anda sebelumnya dengan menerapkan kode dengan versi KCL Anda sebelumnya.
  + **Tindakan yang diperlukan:** Ini berarti pekerja Anda berjalan dalam mode KCL 3.x dan Alat Migrasi KCL mengalihkan semua pekerja ke mode yang kompatibel dengan KCL 2.x. Jika masalah teratasi, Anda tidak perlu menerapkan ulang kode dengan versi KCL sebelumnya. Jika masalah berlanjut, terapkan ulang kode dengan versi KCL sebelumnya ke pekerja Anda.

 

# Gulung maju ke KCL 3.x setelah rollback
<a name="kcl-migration-rollforward"></a>

Topik ini menjelaskan langkah-langkah untuk memajukan konsumen Anda kembali ke KCL 3.x setelah rollback. Ketika Anda perlu maju, Anda harus melalui proses dua langkah: 

1. Jalankan [Alat Migrasi KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py). 

1. Terapkan kode dengan KCL 3.x.

## Langkah 1: Jalankan Alat Migrasi KCL
<a name="kcl-migration-rollback-tool"></a>

Jalankan Alat Migrasi KCL. Alat Migrasi KCL dengan perintah berikut untuk maju ke KCL 3.x:

```
python3 ./KclMigrationTool.py --region <region> --mode rollforward [--application_name <applicationName>] [--coordinator_state_table_name <coordinatorStateTableName>]
```

**Parameter**
+ --region: Ganti `<region>` dengan Anda Wilayah AWS.
+ --application\$1name: Parameter ini diperlukan jika Anda menggunakan nama default untuk tabel status koordinator Anda. Jika Anda telah menentukan nama kustom untuk tabel status koordinator, Anda dapat menghilangkan parameter ini. Ganti `<applicationName>` dengan nama aplikasi KCL Anda yang sebenarnya. Alat ini menggunakan nama ini untuk mendapatkan nama tabel default jika nama kustom tidak disediakan.
+ --coordinator\$1state\$1table\$1name (opsional): Parameter ini diperlukan ketika Anda telah menetapkan nama kustom untuk tabel status koordinator dalam konfigurasi KCL Anda. Jika Anda menggunakan nama tabel default, Anda dapat menghilangkan parameter ini. Ganti `<coordinatorStateTableName>` dengan nama tabel kustom yang Anda tentukan untuk tabel status koordinator Anda. 

Setelah Anda menjalankan alat migrasi dalam mode roll-forward, KCL membuat sumber daya DynamoDB berikut yang diperlukan untuk KCL 3.x:
+ Indeks Sekunder Global pada tabel sewa
+ Tabel metrik pekerja

## Langkah 2: Menyebarkan kode dengan KCL 3.x
<a name="kcl-migration-rollback-redeploy"></a>

Setelah menjalankan KCL Migration Tool untuk maju, terapkan kode Anda dengan KCL 3.x ke pekerja Anda. Ikuti [Langkah 8: Selesaikan migrasi](kcl-migration-from-2-3.md#kcl-migration-from-2-3-finish) untuk menyelesaikan migrasi Anda.

# Praktik terbaik untuk tabel sewa dengan mode kapasitas yang disediakan
<a name="kcl-migration-lease-table"></a>

Jika tabel sewa aplikasi KCL Anda dialihkan ke mode kapasitas yang disediakan, KCL 3.x membuat indeks sekunder global pada tabel sewa dengan mode penagihan yang disediakan dan unit kapasitas baca (RCU) yang sama dan unit kapasitas tulis (WCU) sebagai tabel sewa dasar. Saat indeks sekunder global dibuat, sebaiknya Anda memantau penggunaan aktual pada indeks sekunder global di konsol DynamoDB dan menyesuaikan unit kapasitas jika diperlukan. Untuk panduan lebih rinci tentang beralih mode kapasitas tabel metadata DynamoDB yang dibuat oleh KCL, lihat. [Mode kapasitas DynamoDB untuk tabel metadata yang dibuat oleh KCL](kcl-dynamoDB.md#kcl-capacity-mode) 

**catatan**  
Secara default, KCL membuat tabel metadata seperti tabel sewa, tabel metrik pekerja, dan tabel status koordinator, dan indeks sekunder global pada tabel sewa menggunakan mode kapasitas sesuai permintaan. Kami menyarankan Anda menggunakan mode kapasitas sesuai permintaan untuk menyesuaikan kapasitas secara otomatis berdasarkan perubahan penggunaan Anda. 

# Migrasi dari KCL 1.x ke KCL 3.x
<a name="kcl-migration-1-3"></a>

Topik ini menjelaskan petunjuk untuk memigrasikan konsumen Anda dari KCL 1.x ke KCL 3.x. KCL 1.x menggunakan kelas dan antarmuka yang berbeda dibandingkan dengan KCL 2.x dan KCL 3.x. Anda harus memigrasikan prosesor rekaman, pabrik prosesor rekaman, dan kelas pekerja ke format yang kompatibel dengan KCL 2.x/3.x terlebih dahulu, dan ikuti langkah-langkah migrasi untuk migrasi KCL 2.x ke KCL 3.x. Anda dapat langsung meningkatkan dari KCL 1.x ke KCL 3.x.
+ **Langkah 1: Migrasikan prosesor rekaman**

  Ikuti bagian [Migrasi prosesor rekaman di](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) halaman [Migrasi konsumen dari KCL 1.x ke KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Langkah 2: Migrasikan pabrik prosesor rekaman**

  Ikuti bagian [Migrasi pabrik prosesor rekaman](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-factory-migration) di halaman [Migrasi konsumen dari KCL 1.x ke KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Langkah 3: Migrasikan pekerja**

  Ikuti bagian [Migrasi pekerja di halaman](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#worker-migration) [Migrasi konsumen dari KCL 1.x ke KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Langkah 4: Migrasikan konfigurasi KCL 1.x**

  Ikuti bagian [Konfigurasi klien Amazon Kinesis](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#client-configuration) di halaman [Migrasi konsumen dari KCL 1.x ke KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Langkah 5: Periksa penghapusan waktu idle dan penghapusan konfigurasi klien**

  Ikuti bagian [penghapusan waktu Idle dan penghapusan](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#idle-time-removal) [konfigurasi Klien](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#client-configuration-removals) di halaman [Migrasikan konsumen dari KCL 1.x ke KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Langkah 6: Ikuti step-by-step petunjuk dalam panduan migrasi KCL 2.x ke KCL 3.x**

  Ikuti petunjuk di [Migrasi dari KCL 2.x ke KCL 3.x](kcl-migration-from-2-3.md) halaman untuk menyelesaikan migrasi. Jika Anda perlu memutar kembali ke versi KCL sebelumnya atau maju ke KCL 3.x setelah rollback, lihat dan. [Putar kembali ke versi KCL sebelumnya](kcl-migration-rollback.md) [Gulung maju ke KCL 3.x setelah rollback](kcl-migration-rollforward.md)

**penting**  
Jangan gunakan AWS SDK untuk Java versi 2.27.19 hingga 2.27.23 dengan KCL 3.x. Versi ini menyertakan masalah yang menyebabkan kesalahan pengecualian terkait dengan penggunaan DynamoDB KCL. Kami menyarankan Anda menggunakan AWS SDK untuk Java versi 2.28.0 atau yang lebih baru untuk menghindari masalah ini. 

# Dokumentasi versi KCL sebelumnya
<a name="kcl-archive"></a>

Topik-topik berikut telah diarsipkan. Untuk melihat dokumentasi Perpustakaan Klien Kinesis saat ini, lihat. [Gunakan Perpustakaan Klien Kinesis](kcl.md)

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

**Topics**
+ [Informasi KCL 1.x dan 2.x](shared-throughput-kcl-consumers.md)
+ [Kembangkan konsumen khusus dengan throughput bersama](shared-throughput-consumers.md)
+ [Migrasikan konsumen dari KCL 1.x ke KCL 2.x](kcl-migration.md)

# Informasi KCL 1.x dan 2.x
<a name="shared-throughput-kcl-consumers"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Salah satu metode pengembangan aplikasi konsumen kustom yang dapat memproses data dari aliran data KDS adalah dengan menggunakan Kinesis Client Library (KCL).

**Topics**
+ [Tentang KCL (versi sebelumnya)](#shared-throughput-kcl-consumers-overview)
+ [KCL versi sebelumnya](#shared-throughput-kcl-consumers-versions)
+ [Konsep KCL (versi sebelumnya)](#shared-throughput-kcl-consumers-concepts)
+ [Gunakan tabel sewa untuk melacak pecahan yang diproses oleh aplikasi konsumen KCL](#shared-throughput-kcl-consumers-leasetable)
+ [Memproses beberapa aliran data dengan KCL 2.x yang sama untuk aplikasi konsumen Java](#shared-throughput-kcl-multistream)
+ [Gunakan KCL dengan AWS Glue Schema Registry](#shared-throughput-kcl-consumers-glue-schema-registry)

**catatan**  
Untuk KCL 1.x dan KCL 2.x, Anda disarankan untuk meningkatkan ke versi KCL 1.x terbaru atau versi KCL 2.x, tergantung pada skenario penggunaan Anda. Baik KCL 1.x dan KCL 2.x diperbarui secara berkala dengan rilis yang lebih baru yang mencakup dependensi dan patch keamanan terbaru, perbaikan bug, dan fitur baru yang kompatibel ke belakang. Untuk informasi selengkapnya, lihat [https://github.com/awslabs/amazon-kinesis-client/release](https://github.com/awslabs/amazon-kinesis-client/releases).

## Tentang KCL (versi sebelumnya)
<a name="shared-throughput-kcl-consumers-overview"></a>

KCL membantu Anda mengkonsumsi dan memproses data dari aliran data Kinesis dengan menangani banyak tugas kompleks yang terkait dengan komputasi terdistribusi. Ini termasuk load balancing di beberapa instance aplikasi konsumen, menanggapi kegagalan instans aplikasi konsumen, memeriksa catatan yang diproses, dan bereaksi terhadap resharding. KCL menangani semua subtugas ini sehingga Anda dapat memfokuskan upaya Anda untuk menulis logika pemrosesan catatan khusus Anda.

KCL berbeda dari Kinesis Data APIs Streams yang tersedia di. AWS SDKs Kinesis APIs Data Streams membantu Anda mengelola banyak aspek Kinesis Data Streams, termasuk membuat stream, resharding, dan menempatkan serta mendapatkan rekaman. KCL menyediakan lapisan abstraksi di sekitar semua subtugas ini, khususnya sehingga Anda dapat fokus pada logika pemrosesan data kustom aplikasi konsumen Anda. Untuk informasi tentang Kinesis Data Streams API, lihat Referensi API [Amazon Kinesis](https://docs.aws.amazon.com/kinesis/latest/APIReference/Welcome.html).

**penting**  
KCL adalah perpustakaan Java. Support untuk bahasa selain Java disediakan menggunakan antarmuka multi-bahasa yang disebut. MultiLangDaemon Daemon ini berbasis Java dan berjalan di latar belakang saat Anda menggunakan bahasa KCL selain Java. Misalnya, jika Anda menginstal KCL untuk Python dan menulis aplikasi konsumen Anda sepenuhnya dengan Python, Anda masih memerlukan Java diinstal pada sistem Anda karena itu. MultiLangDaemon Selanjutnya, MultiLangDaemon memiliki beberapa pengaturan default yang mungkin perlu Anda sesuaikan untuk kasus penggunaan Anda, misalnya, AWS wilayah yang terhubung dengannya. Untuk informasi lebih lanjut tentang MultiLangDaemon on GitHub, lihat [ MultiLangDaemon proyek KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

KCL bertindak sebagai perantara antara logika pemrosesan rekaman Anda dan Kinesis Data Streams. 

## KCL versi sebelumnya
<a name="shared-throughput-kcl-consumers-versions"></a>

Saat ini, Anda dapat menggunakan salah satu dari versi KCL yang didukung berikut ini untuk membangun aplikasi konsumen kustom Anda:
+ **KCL 1.x**

  Untuk informasi selengkapnya, lihat [Kembangkan konsumen KCL 1.x](developing-consumers-with-kcl.md)
+ **KCL 2.x**

  Untuk informasi selengkapnya, lihat [Kembangkan Konsumen KCL 2.x](developing-consumers-with-kcl-v2.md)

Anda dapat menggunakan KCL 1.x atau KCL 2.x untuk membangun aplikasi konsumen yang menggunakan throughput bersama. Untuk informasi selengkapnya, lihat [Kembangkan konsumen kustom dengan throughput bersama menggunakan KCL](custom-kcl-consumers.md).

Untuk membangun aplikasi konsumen yang menggunakan throughput khusus (konsumen fan-out yang disempurnakan), Anda hanya dapat menggunakan KCL 2.x. Untuk informasi selengkapnya, lihat [Kembangkan konsumen fan-out yang ditingkatkan dengan throughput khusus](enhanced-consumers.md).

Untuk informasi tentang perbedaan antara KCL 1.x dan KCL 2.x, dan petunjuk tentang cara bermigrasi dari KCL 1.x ke KCL 2.x, lihat. [Migrasikan konsumen dari KCL 1.x ke KCL 2.x](kcl-migration.md)

## Konsep KCL (versi sebelumnya)
<a name="shared-throughput-kcl-consumers-concepts"></a>
+ **Aplikasi konsumen KCL** — aplikasi yang dibuat khusus menggunakan KCL dan dirancang untuk membaca dan memproses catatan dari aliran data. 
+ **Contoh aplikasi konsumen** - Aplikasi konsumen KCL biasanya didistribusikan, dengan satu atau lebih instance aplikasi berjalan secara bersamaan untuk mengoordinasikan kegagalan dan pemrosesan catatan data keseimbangan beban secara dinamis.
+ **Worker** — kelas tingkat tinggi yang digunakan instance aplikasi konsumen KCL untuk mulai memproses data. 
**penting**  
Setiap instance aplikasi konsumen KCL memiliki satu pekerja. 

  Pekerja menginisialisasi dan mengawasi berbagai tugas, termasuk menyinkronkan informasi pecahan dan sewa, melacak tugas pecahan, dan memproses data dari pecahan. Seorang pekerja memberi KCL informasi konfigurasi untuk aplikasi konsumen, seperti nama aliran data yang datanya mencatat aplikasi konsumen KCL ini akan diproses dan AWS kredensil yang diperlukan untuk mengakses aliran data ini. Pekerja juga memulai instance aplikasi konsumen KCL tertentu untuk mengirimkan catatan data dari aliran data ke prosesor rekaman.
**penting**  
**Dalam KCL 1.x kelas ini disebut Worker.** [Untuk informasi lebih lanjut, (ini adalah repositori Java KCL), lihat https://github.com/awslabs/amazon-kinesis-client/.java. blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java) **Dalam KCL 2.x, kelas ini disebut Scheduler.** Tujuan Scheduler di KCL 2.x identik dengan tujuan Pekerja di KCL 1.x. [Untuk informasi selengkapnya tentang kelas Scheduler di KCL 2.x, lihat https://github.com/awslabs/amazon-kinesis-client/.java. blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java) 
+ **Sewa** — data yang mendefinisikan pengikatan antara pekerja dan pecahan. Aplikasi konsumen KCL terdistribusi menggunakan sewa untuk mempartisi pemrosesan catatan data di seluruh armada pekerja. **Pada waktu tertentu, setiap pecahan catatan data terikat pada pekerja tertentu dengan sewa yang diidentifikasi oleh variabel LeaseKey.** 

  Secara default, seorang pekerja dapat memegang satu atau lebih sewa (tunduk pada nilai variabel **maxLeasesForWorker**) pada saat yang sama. 
**penting**  
Setiap pekerja akan bersaing untuk memegang semua sewa yang tersedia untuk semua pecahan yang tersedia dalam aliran data. Tetapi hanya satu pekerja yang akan berhasil memegang setiap sewa pada satu waktu. 

  Misalnya, jika Anda memiliki instance aplikasi konsumen A dengan pekerja A yang memproses aliran data dengan 4 pecahan, pekerja A dapat menyimpan sewa ke pecahan 1, 2, 3, dan 4 secara bersamaan. Tetapi jika Anda memiliki dua instance aplikasi konsumen: A dan B dengan pekerja A dan pekerja B, dan instance ini memproses aliran data dengan 4 pecahan, pekerja A dan pekerja B tidak dapat menahan sewa ke shard 1 secara bersamaan. Seorang pekerja memegang sewa ke pecahan tertentu sampai siap untuk berhenti memproses catatan data pecahan ini atau sampai gagal. Ketika satu pekerja berhenti memegang sewa, pekerja lain mengambil dan memegang sewa. 

  [Untuk informasi lebih lanjut, (ini adalah repositori Java KCL), lihat [https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java untuk KCL 1.x dan https://github.com/awslabs/amazon-kinesis-client/.java](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java) untuk KCL 2.x. blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java)
+ **Tabel sewa - tabel** Amazon DynamoDB unik yang digunakan untuk melacak pecahan dalam aliran data KDS yang disewakan dan diproses oleh pekerja aplikasi konsumen KCL. Tabel sewa harus tetap sinkron (di dalam pekerja dan di semua pekerja) dengan informasi pecahan terbaru dari aliran data saat aplikasi konsumen KCL berjalan. Untuk informasi selengkapnya, lihat [Gunakan tabel sewa untuk melacak pecahan yang diproses oleh aplikasi konsumen KCL](#shared-throughput-kcl-consumers-leasetable).
+ **Rekam prosesor** — logika yang mendefinisikan bagaimana aplikasi konsumen KCL Anda memproses data yang didapatnya dari aliran data. Saat runtime, instance aplikasi konsumen KCL membuat instance pekerja, dan pekerja ini membuat instance satu prosesor rekaman untuk setiap pecahan yang disewanya. 

## Gunakan tabel sewa untuk melacak pecahan yang diproses oleh aplikasi konsumen KCL
<a name="shared-throughput-kcl-consumers-leasetable"></a>

**Topics**
+ [Apa itu meja sewa](#shared-throughput-kcl-consumers-what-is-leasetable)
+ [Throughput](#shared-throughput-kcl-leasetable-throughput)
+ [Bagaimana tabel sewa disinkronkan dengan pecahan dalam aliran data Kinesis](#shared-throughput-kcl-consumers-leasetable-sync)

### Apa itu meja sewa
<a name="shared-throughput-kcl-consumers-what-is-leasetable"></a>

Untuk setiap aplikasi Amazon Kinesis Data Streams, KCL menggunakan tabel sewa unik (disimpan dalam tabel Amazon DynamoDB) untuk melacak pecahan dalam aliran data KDS yang disewakan dan diproses oleh pekerja aplikasi konsumen KCL.

**penting**  
KCL menggunakan nama aplikasi konsumen untuk membuat nama tabel sewa yang digunakan aplikasi konsumen ini, oleh karena itu setiap nama aplikasi konsumen harus unik.

Anda dapat melihat tabel sewa menggunakan konsol [Amazon DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html) saat aplikasi konsumen sedang berjalan.

Jika tabel sewa untuk aplikasi konsumen KCL Anda tidak ada saat aplikasi dimulai, salah satu pekerja membuat tabel sewa untuk aplikasi ini. 

**penting**  
 Akun Anda dikenakan biaya untuk biaya yang terkait dengan tabel DynamoDB, selain biaya yang terkait dengan Kinesis Data Streams itu sendiri. 

Setiap baris dalam tabel sewa mewakili pecahan yang sedang diproses oleh pekerja aplikasi konsumen Anda. Jika aplikasi konsumen KCL Anda hanya memproses satu aliran data`leaseKey`, maka kunci hash untuk tabel sewa adalah ID pecahan. Jika ya[Memproses beberapa aliran data dengan KCL 2.x yang sama untuk aplikasi konsumen Java](#shared-throughput-kcl-multistream), maka struktur LeaseKey terlihat seperti ini:. `account-id:StreamName:streamCreationTimestamp:ShardId` Misalnya, `111111111:multiStreamTest-1:12345:shardId-000000000336`.

Selain ID pecahan, setiap baris juga menyertakan data berikut:
+ **pos pemeriksaan:** Nomor urutan pos pemeriksaan terbaru untuk pecahan. Nilai ini unik di semua pecahan dalam aliran data.
+ **checkpointSubSequenceNomor:** Saat menggunakan fitur agregasi Perpustakaan Produser Kinesis, ini adalah ekstensi ke **pos pemeriksaan** yang melacak catatan pengguna individu dalam catatan Kinesis.
+ **LeaseCounter**: Digunakan untuk pembuatan versi sewa sehingga pekerja dapat mendeteksi bahwa sewa mereka telah diambil oleh pekerja lain.
+ **LeaseKey**: Pengidentifikasi unik untuk sewa. Setiap sewa khusus untuk pecahan dalam aliran data dan dipegang oleh satu pekerja pada satu waktu.
+ **LeaseOwner**: Pekerja yang memegang sewa ini.
+ **ownerSwitchesSincePos pemeriksaan:** Berapa kali sewa ini telah berganti pekerja sejak terakhir kali pos pemeriksaan ditulis.
+ **parentShardId:** Digunakan untuk memastikan bahwa pecahan induk diproses sepenuhnya sebelum pemrosesan dimulai pada pecahan anak. Ini memastikan bahwa catatan diproses dalam urutan yang sama dengan yang dimasukkan ke dalam aliran.
+ **hashrange:** Digunakan oleh `PeriodicShardSyncManager` untuk menjalankan sinkronisasi berkala untuk menemukan pecahan yang hilang di tabel sewa dan membuat sewa untuk mereka jika diperlukan. 
**catatan**  
Data ini hadir dalam tabel sewa untuk setiap pecahan dimulai dengan KCL 1.14 dan KCL 2.3. Untuk informasi lebih lanjut tentang `PeriodicShardSyncManager` dan sinkronisasi berkala antara sewa dan pecahan, lihat. [Bagaimana tabel sewa disinkronkan dengan pecahan dalam aliran data Kinesis](#shared-throughput-kcl-consumers-leasetable-sync)
+ **childshards:** Digunakan oleh `LeaseCleanupManager` untuk meninjau status pemrosesan pecahan anak dan memutuskan apakah pecahan induk dapat dihapus dari tabel sewa.
**catatan**  
Data ini hadir dalam tabel sewa untuk setiap pecahan dimulai dengan KCL 1.14 dan KCL 2.3.
+ **ShardID:** ID pecahan.
**catatan**  
Data ini hanya ada di tabel sewa jika Anda berada[Memproses beberapa aliran data dengan KCL 2.x yang sama untuk aplikasi konsumen Java](#shared-throughput-kcl-multistream). Ini hanya didukung di KCL 2.x untuk Java, dimulai dengan KCL 2.3 untuk Java dan yang lebih baru. 
+ **nama aliran** Pengidentifikasi aliran data dalam format berikut:`account-id:StreamName:streamCreationTimestamp`.
**catatan**  
Data ini hanya ada di tabel sewa jika Anda berada[Memproses beberapa aliran data dengan KCL 2.x yang sama untuk aplikasi konsumen Java](#shared-throughput-kcl-multistream). Ini hanya didukung di KCL 2.x untuk Java, dimulai dengan KCL 2.3 untuk Java dan yang lebih baru. 

### Throughput
<a name="shared-throughput-kcl-leasetable-throughput"></a>

Jika aplikasi Amazon Kinesis Data Streams menerima pengecualian throughput yang disediakan, Anda harus meningkatkan throughput yang disediakan untuk tabel DynamoDB. KCL membuat tabel dengan throughput yang disediakan 10 pembacaan per detik dan 10 penulisan per detik, tetapi ini mungkin tidak cukup untuk aplikasi Anda. Misalnya, jika aplikasi Amazon Kinesis Data Streams sering melakukan pemeriksaan atau beroperasi pada aliran yang terdiri dari banyak pecahan, Anda mungkin memerlukan lebih banyak throughput.

*Untuk informasi tentang throughput yang disediakan di DynamoDB, lihat [Mode Kapasitas Baca/Tulis](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html) dan Bekerja dengan [Tabel dan Data](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithDDTables.html) di Panduan Pengembang Amazon DynamoDB.*

### Bagaimana tabel sewa disinkronkan dengan pecahan dalam aliran data Kinesis
<a name="shared-throughput-kcl-consumers-leasetable-sync"></a>

Pekerja dalam aplikasi konsumen KCL menggunakan sewa untuk memproses pecahan dari aliran data tertentu. Informasi tentang pekerja apa yang menyewakan pecahan apa pada waktu tertentu disimpan dalam tabel sewa. Tabel sewa harus tetap sinkron dengan informasi pecahan terbaru dari aliran data saat aplikasi konsumen KCL berjalan. KCL menyinkronkan tabel sewa dengan informasi pecahan yang diperoleh dari layanan Kinesis Data Streams selama bootstrap aplikasi konsumen (baik ketika aplikasi konsumen diinisialisasi atau dimulai ulang) dan juga setiap kali pecahan yang sedang diproses mencapai akhir (resharding). Dengan kata lain, pekerja atau aplikasi konsumen KCL disinkronkan dengan aliran data yang mereka proses selama bootstrap aplikasi konsumen awal dan setiap kali aplikasi konsumen menemukan peristiwa reshard aliran data.

**Topics**
+ [Sinkronisasi di KCL 1.0 - 1.13 dan KCL 2.0 - 2.2](#shared-throughput-kcl-consumers-leasetable-sync-old)
+ [Sinkronisasi di KCL 2.x, dimulai dengan KCL 2.3 dan yang lebih baru](#shared-throughput-kcl-consumers-leasetable-sync-new-kcl2)
+ [Sinkronisasi di KCL 1.x, dimulai dengan KCL 1.14 dan yang lebih baru](#shared-throughput-kcl-consumers-leasetable-sync-new-kcl1)

#### Sinkronisasi di KCL 1.0 - 1.13 dan KCL 2.0 - 2.2
<a name="shared-throughput-kcl-consumers-leasetable-sync-old"></a>

Dalam KCL 1.0 - 1.13 dan KCL 2.0 - 2.2, selama bootstrap aplikasi konsumen dan juga selama setiap peristiwa reshard aliran data, KCL menyinkronkan tabel sewa dengan informasi pecahan yang diperoleh dari layanan Kinesis Data Streams dengan menjalankan atau penemuan. `ListShards` `DescribeStream` APIs Dalam semua versi KCL yang tercantum di atas, setiap pekerja aplikasi konsumen KCL menyelesaikan langkah-langkah berikut untuk melakukan proses lease/shard sinkronisasi selama bootstrap aplikasi konsumen dan pada setiap acara reshard aliran:
+ Mengambil semua pecahan untuk data aliran yang sedang diproses
+ Mengambil semua sewa pecahan dari tabel sewa
+ Menyaring setiap pecahan terbuka yang tidak memiliki sewa di tabel sewa
+ Mengulangi semua pecahan terbuka yang ditemukan dan untuk setiap pecahan terbuka tanpa induk terbuka:
  + Melintasi pohon hierarki melalui jalur leluhurnya untuk menentukan apakah pecahan itu adalah keturunan. Pecahan dianggap sebagai keturunan, jika pecahan leluhur sedang diproses (entri sewa untuk pecahan leluhur ada di tabel sewa) atau jika pecahan leluhur harus diproses (misalnya, jika posisi awal adalah atau) `TRIM_HORIZON` `AT_TIMESTAMP`
  + Jika pecahan terbuka dalam konteks adalah keturunan, KCL memeriksa pecahan berdasarkan posisi awal dan membuat sewa untuk orang tuanya, jika diperlukan

#### Sinkronisasi di KCL 2.x, dimulai dengan KCL 2.3 dan yang lebih baru
<a name="shared-throughput-kcl-consumers-leasetable-sync-new-kcl2"></a>

Dimulai dengan versi terbaru yang didukung dari KCL 2.x (KCL 2.3) dan yang lebih baru, perpustakaan sekarang mendukung perubahan berikut pada proses sinkronisasi. Perubahan lease/shard sinkronisasi ini secara signifikan mengurangi jumlah panggilan API yang dilakukan oleh aplikasi konsumen KCL ke layanan Kinesis Data Streams dan mengoptimalkan manajemen sewa di aplikasi konsumen KCL Anda. 
+ Selama bootstrap aplikasi, jika tabel sewa kosong, KCL menggunakan opsi pemfilteran `ListShard` API (parameter permintaan `ShardFilter` opsional) untuk mengambil dan membuat sewa hanya untuk snapshot pecahan yang terbuka pada waktu yang ditentukan oleh parameter. `ShardFilter` `ShardFilter`Parameter ini memungkinkan Anda untuk memfilter respons `ListShards` API. Satu-satunya properti yang diperlukan dari `ShardFilter` parameter adalah`Type`. KCL menggunakan properti `Type` filter dan berikut nilai validnya untuk mengidentifikasi dan mengembalikan snapshot pecahan terbuka yang mungkin memerlukan sewa baru:
  + `AT_TRIM_HORIZON`- Responsnya mencakup semua pecahan yang terbuka di`TRIM_HORIZON`. 
  + `AT_LATEST`- Respons hanya mencakup pecahan aliran data yang saat ini terbuka. 
  + `AT_TIMESTAMP`- respons mencakup semua pecahan yang stempel waktu awalnya kurang dari atau sama dengan stempel waktu yang diberikan dan stempel waktu akhir lebih besar dari atau sama dengan stempel waktu yang diberikan atau masih terbuka.

  `ShardFilter`digunakan saat membuat sewa untuk tabel sewa kosong untuk menginisialisasi sewa untuk snapshot pecahan yang ditentukan di. `RetrievalConfig#initialPositionInStreamExtended`

  Untuk informasi selengkapnya tentang `ShardFilter`, lihat [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html).
+ Alih-alih semua pekerja melakukan lease/shard sinkronisasi untuk menjaga tabel sewa tetap up to date dengan pecahan terbaru dalam aliran data, satu pemimpin pekerja terpilih melakukan sinkronisasi sewa/pecahan.
+ KCL 2.3 menggunakan parameter `ChildShards` pengembalian `GetRecords` dan `SubscribeToShard` APIs untuk melakukan lease/shard sinkronisasi yang terjadi pada `SHARD_END` pecahan tertutup, memungkinkan pekerja KCL untuk hanya membuat sewa untuk pecahan anak dari pecahan yang selesai diproses. Untuk dibagikan di seluruh aplikasi konsumen, optimalisasi lease/shard sinkronisasi ini menggunakan `ChildShards` parameter `GetRecords` API. Untuk aplikasi konsumen throughput khusus (fan-out yang disempurnakan), optimalisasi lease/shard sinkronisasi ini menggunakan `ChildShards` parameter API. `SubscribeToShard` Lihat informasi selengkapnya di [GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html), [SubscribeToShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html), dan [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).
+ Dengan perubahan di atas, perilaku KCL bergerak dari model semua pekerja yang belajar tentang semua pecahan yang ada ke model pekerja yang hanya belajar tentang pecahan pecahan anak-anak yang dimiliki setiap pekerja. Oleh karena itu, selain sinkronisasi yang terjadi selama bootstraping aplikasi konsumen dan peristiwa reshard, KCL sekarang juga melakukan shard/lease pemindaian berkala tambahan untuk mengidentifikasi lubang potensial dalam tabel sewa (dengan kata lain, untuk mempelajari semua pecahan baru) untuk memastikan rentang hash lengkap dari aliran data sedang diproses dan membuat sewa untuk mereka jika diperlukan. `PeriodicShardSyncManager`adalah komponen yang bertanggung jawab untuk menjalankan lease/shard pemindaian berkala. 

  Untuk informasi selengkapnya tentang `PeriodicShardSyncManager` di KCL 2.3, lihat [https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java \$1L201](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L201-L213) -L213.

  Di KCL 2.3, opsi konfigurasi baru tersedia untuk dikonfigurasi `PeriodicShardSyncManager` di`LeaseManagementConfig`:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/streams/latest/dev/shared-throughput-kcl-consumers.html)

   CloudWatch Metrik baru juga sekarang dipancarkan untuk memantau kesehatan. `PeriodicShardSyncManager` Untuk informasi selengkapnya, lihat [PeriodicShardSyncManager](monitoring-with-kcl.md#periodic-task).
+ Termasuk optimasi `HierarchicalShardSyncer` untuk hanya membuat sewa untuk satu lapisan pecahan.

#### Sinkronisasi di KCL 1.x, dimulai dengan KCL 1.14 dan yang lebih baru
<a name="shared-throughput-kcl-consumers-leasetable-sync-new-kcl1"></a>

Dimulai dengan versi terbaru yang didukung dari KCL 1.x (KCL 1.14) dan yang lebih baru, perpustakaan sekarang mendukung perubahan berikut pada proses sinkronisasi. Perubahan lease/shard sinkronisasi ini secara signifikan mengurangi jumlah panggilan API yang dilakukan oleh aplikasi konsumen KCL ke layanan Kinesis Data Streams dan mengoptimalkan manajemen sewa di aplikasi konsumen KCL Anda. 
+ Selama bootstrap aplikasi, jika tabel sewa kosong, KCL menggunakan opsi pemfilteran `ListShard` API (parameter permintaan `ShardFilter` opsional) untuk mengambil dan membuat sewa hanya untuk snapshot pecahan yang terbuka pada waktu yang ditentukan oleh parameter. `ShardFilter` `ShardFilter`Parameter ini memungkinkan Anda untuk memfilter respons `ListShards` API. Satu-satunya properti yang diperlukan dari `ShardFilter` parameter adalah`Type`. KCL menggunakan properti `Type` filter dan berikut nilai validnya untuk mengidentifikasi dan mengembalikan snapshot pecahan terbuka yang mungkin memerlukan sewa baru:
  + `AT_TRIM_HORIZON`- Responsnya mencakup semua pecahan yang terbuka di`TRIM_HORIZON`. 
  + `AT_LATEST`- Respons hanya mencakup pecahan aliran data yang saat ini terbuka. 
  + `AT_TIMESTAMP`- respons mencakup semua pecahan yang stempel waktu awalnya kurang dari atau sama dengan stempel waktu yang diberikan dan stempel waktu akhir lebih besar dari atau sama dengan stempel waktu yang diberikan atau masih terbuka.

  `ShardFilter`digunakan saat membuat sewa untuk tabel sewa kosong untuk menginisialisasi sewa untuk snapshot pecahan yang ditentukan di. `KinesisClientLibConfiguration#initialPositionInStreamExtended`

  Untuk informasi selengkapnya tentang `ShardFilter`, lihat [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html).
+ Alih-alih semua pekerja melakukan lease/shard sinkronisasi untuk menjaga tabel sewa tetap up to date dengan pecahan terbaru dalam aliran data, satu pemimpin pekerja terpilih melakukan sinkronisasi sewa/pecahan.
+ KCL 1.14 menggunakan parameter `ChildShards` pengembalian `GetRecords` dan `SubscribeToShard` APIs untuk melakukan lease/shard sinkronisasi yang terjadi pada `SHARD_END` pecahan tertutup, memungkinkan pekerja KCL untuk hanya membuat sewa untuk pecahan anak dari pecahan yang selesai diproses. Untuk informasi selengkapnya, lihat [GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) dan [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).
+ Dengan perubahan di atas, perilaku KCL bergerak dari model semua pekerja yang belajar tentang semua pecahan yang ada ke model pekerja yang hanya belajar tentang pecahan pecahan anak-anak yang dimiliki setiap pekerja. Oleh karena itu, selain sinkronisasi yang terjadi selama bootstraping aplikasi konsumen dan peristiwa reshard, KCL sekarang juga melakukan shard/lease pemindaian berkala tambahan untuk mengidentifikasi lubang potensial dalam tabel sewa (dengan kata lain, untuk mempelajari semua pecahan baru) untuk memastikan rentang hash lengkap dari aliran data sedang diproses dan membuat sewa untuk mereka jika diperlukan. `PeriodicShardSyncManager`adalah komponen yang bertanggung jawab untuk menjalankan lease/shard pemindaian berkala. 

  Kapan `KinesisClientLibConfiguration#shardSyncStrategyType` diatur ke`ShardSyncStrategyType.SHARD_END`, `PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold` digunakan untuk menentukan ambang batas untuk jumlah pemindaian berturut-turut yang berisi lubang di tabel sewa setelah itu untuk menegakkan sinkronisasi pecahan. Kapan `KinesisClientLibConfiguration#shardSyncStrategyType` diatur ke`ShardSyncStrategyType.PERIODIC`, `leasesRecoveryAuditorInconsistencyConfidenceThreshold` diabaikan.

  Untuk informasi lebih lanjut tentang `PeriodicShardSyncManager` di KCL 1.14, lihat [https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L987-L999) \$1L987 -L999.

  Di KCL 1.14, opsi konfigurasi baru tersedia untuk dikonfigurasi `PeriodicShardSyncManager` di: `LeaseManagementConfig`    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/streams/latest/dev/shared-throughput-kcl-consumers.html)

   CloudWatch Metrik baru juga sekarang dipancarkan untuk memantau kesehatan. `PeriodicShardSyncManager` Untuk informasi selengkapnya, lihat [PeriodicShardSyncManager](monitoring-with-kcl.md#periodic-task).
+ KCL 1.14 sekarang juga mendukung pembersihan sewa yang ditangguhkan. Sewa dihapus secara asinkron `LeaseCleanupManager` pada saat mencapai`SHARD_END`, ketika pecahan telah kedaluwarsa melewati periode retensi aliran data atau ditutup sebagai hasil dari operasi resharding.

  Opsi konfigurasi baru tersedia untuk dikonfigurasi`LeaseCleanupManager`.    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/streams/latest/dev/shared-throughput-kcl-consumers.html)
+ Termasuk optimasi `KinesisShardSyncer` untuk hanya membuat sewa untuk satu lapisan pecahan.

## Memproses beberapa aliran data dengan KCL 2.x yang sama untuk aplikasi konsumen Java
<a name="shared-throughput-kcl-multistream"></a>

Bagian ini menjelaskan perubahan berikut dalam KCL 2.x untuk Java yang memungkinkan Anda membuat aplikasi konsumen KCL yang dapat memproses lebih dari satu aliran data pada saat yang bersamaan. 

**penting**  
Pemrosesan multistream hanya didukung di KCL 2.x untuk Java, dimulai dengan KCL 2.3 untuk Java dan yang lebih baru.   
Pemrosesan multistream TIDAK didukung untuk bahasa lain di mana KCL 2.x dapat diimplementasikan.  
Pemrosesan multistream TIDAK didukung dalam versi KCL 1.x apa pun.
+ **MultistreamTracker antarmuka**

  Untuk membangun aplikasi konsumen yang dapat memproses beberapa aliran pada saat yang sama, Anda harus menerapkan antarmuka baru yang disebut [MultistreamTracker](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java). Antarmuka ini mencakup `streamConfigList` metode yang mengembalikan daftar aliran data dan konfigurasinya untuk diproses oleh aplikasi konsumen KCL. Perhatikan bahwa aliran data yang sedang diproses dapat diubah selama runtime aplikasi konsumen. `streamConfigList`disebut secara berkala oleh KCL untuk mempelajari tentang perubahan aliran data untuk diproses.

  `streamConfigList`Metode ini mengisi [StreamConfig](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java#L23)daftar. 

  ```
  package software.amazon.kinesis.common;
  
  import lombok.Data;
  import lombok.experimental.Accessors;
  
  @Data
  @Accessors(fluent = true)
  public class StreamConfig {
      private final StreamIdentifier streamIdentifier;
      private final InitialPositionInStreamExtended initialPositionInStreamExtended;
      private String consumerArn;
  }
  ```

  Perhatikan bahwa bidang `StreamIdentifier` dan `InitialPositionInStreamExtended` wajib, sementara `consumerArn` adalah opsional. Anda harus menyediakan `consumerArn` satu-satunya jika Anda menggunakan KCL 2.x untuk mengimplementasikan aplikasi konsumen penggemar yang disempurnakan.

  Untuk informasi selengkapnya`StreamIdentifier`, lihat [https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java \$1L129](https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129). Untuk membuat`StreamIdentifier`, kami sarankan Anda membuat instance multistream dari `streamArn` dan yang tersedia di v2.5.0 dan `streamCreationEpoch` yang lebih baru. Di KCL v2.3 dan v2.4, yang tidak mendukung`streamArm`, buat instance multistream dengan menggunakan format. `account-id:StreamName:streamCreationTimestamp` Format ini akan usang dan tidak lagi didukung dimulai dengan rilis utama berikutnya.

  `MultistreamTracker`juga mencakup strategi untuk menghapus sewa aliran lama di tabel sewa (). `formerStreamsLeasesDeletionStrategy` Perhatikan bahwa strategi TIDAK DAPAT diubah selama runtime aplikasi konsumen. Untuk informasi lebih lanjut, lihat [https://github.com/awslabs/amazon-kinesis-clientb/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy/blob/0c5042dadf794fe988438436252a5a8fe70b6b0](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java) .java
+ [ConfigsBuilder](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java)adalah kelas seluruh aplikasi yang dapat Anda gunakan untuk menentukan semua pengaturan konfigurasi KCL 2.x yang akan digunakan saat membangun aplikasi konsumen KCL Anda. `ConfigsBuilder`kelas sekarang memiliki dukungan untuk `MultistreamTracker` antarmuka. Anda dapat menginisialisasi ConfigsBuilder baik dengan nama satu aliran data untuk menggunakan catatan dari:

  ```
   /**
       * Constructor to initialize ConfigsBuilder with StreamName
       * @param streamName
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.right(streamName);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```

  Atau Anda dapat menginisialisasi ConfigsBuilder dengan `MultiStreamTracker` jika Anda ingin mengimplementasikan aplikasi konsumen KCL yang memproses beberapa aliran secara bersamaan.

  ```
  * Constructor to initialize ConfigsBuilder with MultiStreamTracker
       * @param multiStreamTracker
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.left(multiStreamTracker);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```
+ Dengan dukungan multistream yang diterapkan untuk aplikasi konsumen KCL Anda, setiap baris tabel sewa aplikasi sekarang berisi ID pecahan dan nama aliran dari beberapa aliran data yang diproses aplikasi ini. 
+ Ketika dukungan multistream untuk aplikasi konsumen KCL Anda diimplementasikan, LeaseKey mengambil struktur berikut: `account-id:StreamName:streamCreationTimestamp:ShardId` Misalnya, `111111111:multiStreamTest-1:12345:shardId-000000000336`.
**penting**  
Ketika aplikasi konsumen KCL Anda yang ada dikonfigurasi untuk memproses hanya satu aliran data, LeaseKey (yang merupakan kunci hash untuk tabel sewa) adalah ID pecahan. Jika Anda mengkonfigurasi ulang aplikasi konsumen KCL yang ada ini untuk memproses beberapa aliran data, itu merusak tabel sewa Anda, karena dengan dukungan multistream, struktur LeaseKey harus sebagai berikut:. `account-id:StreamName:StreamCreationTimestamp:ShardId`

## Gunakan KCL dengan AWS Glue Schema Registry
<a name="shared-throughput-kcl-consumers-glue-schema-registry"></a>

Anda dapat mengintegrasikan aliran data Kinesis Anda dengan Registri Skema. AWS Glue Registri AWS Glue Skema memungkinkan Anda untuk menemukan, mengontrol, dan mengembangkan skema secara terpusat, sambil memastikan data yang dihasilkan terus divalidasi oleh skema terdaftar. Sebuah skema mendefinisikan struktur dan format catatan data. Sebuah skema adalah sebuah spesifikasi berversi untuk publikasi data yang handal, konsumsi, atau penyimpanan. Registri AWS Glue Skema memungkinkan Anda meningkatkan kualitas end-to-end data dan tata kelola data dalam aplikasi streaming Anda. Untuk informasi selengkapnya, lihat [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Salah satu cara untuk mengatur integrasi ini adalah melalui KCL di Jawa. 

**penting**  
Saat ini, integrasi Kinesis Data AWS Glue Streams dan Schema Registry hanya didukung untuk aliran data Kinesis yang menggunakan konsumen KCL 2.3 yang diterapkan di Jawa. Support multi-bahasa tidak tersedia. Konsumen KCL 1.0 tidak didukung. Konsumen KCL 2.x sebelum KCL 2.3 tidak didukung.

Untuk petunjuk terperinci tentang cara mengatur integrasi Kinesis Data Streams dengan Schema Registry menggunakan KCL, lihat [bagian “Berinteraksi dengan Data Menggunakan Pustaka” di Kasus KPL/KCL Penggunaan: Mengintegrasikan Amazon Kinesis Data Streams](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds) dengan Glue Schema Registry. AWS 

# Kembangkan konsumen khusus dengan throughput bersama
<a name="shared-throughput-consumers"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Jika Anda tidak memerlukan throughput khusus saat menerima data dari Kinesis Data Streams, dan jika Anda tidak memerlukan penundaan propagasi baca di bawah 200 ms, Anda dapat membuat aplikasi konsumen seperti yang dijelaskan dalam topik berikut. Anda dapat menggunakan Kinesis Client Library (KCL) atau. AWS SDK untuk Java

**Topics**
+ [Kembangkan konsumen kustom dengan throughput bersama menggunakan KCL](custom-kcl-consumers.md)

Untuk informasi tentang membangun konsumen yang dapat menerima catatan dari aliran data Kinesis dengan throughput khusus, lihat. [Kembangkan konsumen fan-out yang ditingkatkan dengan throughput khusus](enhanced-consumers.md)

# Kembangkan konsumen kustom dengan throughput bersama menggunakan KCL
<a name="custom-kcl-consumers"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Salah satu metode pengembangan aplikasi konsumen khusus dengan throughput bersama adalah dengan menggunakan Kinesis Client Library (KCL). 

Pilih dari topik berikut untuk versi KCL yang Anda gunakan.

**Topics**
+ [Kembangkan konsumen KCL 1.x](developing-consumers-with-kcl.md)
+ [Kembangkan Konsumen KCL 2.x](developing-consumers-with-kcl-v2.md)

# Kembangkan konsumen KCL 1.x
<a name="developing-consumers-with-kcl"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Anda dapat mengembangkan aplikasi konsumen untuk Amazon Kinesis Data Streams menggunakan Kinesis Client Library (KCL). 

Untuk informasi lebih lanjut tentang KCL, lihat[Tentang KCL (versi sebelumnya)](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-overview).

Pilih dari topik berikut tergantung pada opsi yang ingin Anda gunakan.

**Topics**
+ [Mengembangkan Konsumen Perpustakaan Klien Kinesis di Jawa](kinesis-record-processor-implementation-app-java.md)
+ [Mengembangkan konsumen Perpustakaan Klien Kinesis di Node.js](kinesis-record-processor-implementation-app-nodejs.md)
+ [Mengembangkan konsumen Perpustakaan Klien Kinesis di .NET](kinesis-record-processor-implementation-app-dotnet.md)
+ [Kembangkan konsumen Perpustakaan Klien Kinesis dengan Python](kinesis-record-processor-implementation-app-py.md)
+ [Mengembangkan Konsumen Perpustakaan Klien Kinesis di Ruby](kinesis-record-processor-implementation-app-ruby.md)

# Mengembangkan Konsumen Perpustakaan Klien Kinesis di Jawa
<a name="kinesis-record-processor-implementation-app-java"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Anda dapat menggunakan Kinesis Client Library (KCL) untuk membangun aplikasi yang memproses data dari aliran data Kinesis Anda. Perpustakaan Klien Kinesis tersedia dalam berbagai bahasa. Topik ini membahas Java. Untuk melihat referensi Javadoc, lihat topik [AWS Javadoc](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html) untuk Kelas. AmazonKinesisClient

Untuk mengunduh Java KCL dari GitHub, buka [Perpustakaan Klien Kinesis (](https://github.com/awslabs/amazon-kinesis-client)Java). Untuk menemukan Java KCL di Apache Maven, buka halaman hasil pencarian [KCL](https://search.maven.org/#search|ga|1|amazon-kinesis-client). Untuk mengunduh kode sampel untuk aplikasi konsumen Java KCL dari GitHub, buka halaman [proyek sampel KCL untuk Java](https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis). GitHub 

Aplikasi sampel menggunakan [Apache Commons Logging.](http://commons.apache.org/proper/commons-logging/guide.html) Anda dapat mengubah konfigurasi logging dalam `configure` metode statis yang ditentukan dalam `AmazonKinesisApplicationSample.java` file. *Untuk informasi selengkapnya tentang cara menggunakan Apache Commons Logging dengan aplikasi Log4j dan AWS Java, lihat [Logging with Log4j](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html) di Panduan Pengembang.AWS SDK untuk Java *

Anda harus menyelesaikan tugas-tugas berikut saat menerapkan aplikasi konsumen KCL di Jawa:

**Topics**
+ [Menerapkan metode IRecord Processor](#kinesis-record-processor-implementation-interface-java)
+ [Menerapkan pabrik kelas untuk antarmuka IRecord Processor](#kinesis-record-processor-implementation-factory-java)
+ [Buat pekerja](#kcl-java-worker)
+ [Ubah properti konfigurasi](#kinesis-record-processor-initialization-java)
+ [Migrasi ke Versi 2 dari antarmuka prosesor rekaman](#kcl-java-v2-migration)

## Menerapkan metode IRecord Processor
<a name="kinesis-record-processor-implementation-interface-java"></a>

KCL saat ini mendukung dua versi antarmuka`IRecordProcessor`: Antarmuka asli tersedia dengan versi pertama KCL, dan versi 2 tersedia dimulai dengan KCL versi 1.5.0. Kedua antarmuka didukung penuh. Pilihan Anda tergantung pada persyaratan skenario spesifik Anda. Lihat Javadocs yang dibuat secara lokal atau kode sumber untuk melihat semua perbedaannya. Bagian berikut menguraikan implementasi minimal untuk memulai.

**Topics**
+ [Antarmuka Asli (Versi 1)](#kcl-java-interface-original)
+ [Antarmuka yang diperbarui (Versi 2)](#kcl-java-interface-v2)

### Antarmuka Asli (Versi 1)
<a name="kcl-java-interface-original"></a>

`IRecordProcessor`Antarmuka asli (`package com.amazonaws.services.kinesis.clientlibrary.interfaces`) memperlihatkan metode prosesor rekaman berikut yang harus diterapkan konsumen Anda. Sampel menyediakan implementasi yang dapat Anda gunakan sebagai titik awal (lihat`AmazonKinesisApplicationSampleRecordProcessor.java`).

```
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

**menginisialisasi**  
KCL memanggil `initialize` metode ketika prosesor rekaman dipakai, melewati ID pecahan tertentu sebagai parameter. Prosesor rekaman ini hanya memproses pecahan ini dan biasanya, kebalikannya juga benar (pecahan ini hanya diproses oleh prosesor rekaman ini). Namun, konsumen Anda harus memperhitungkan kemungkinan bahwa catatan data dapat diproses lebih dari satu kali. Kinesis Data *Streams memiliki semantik setidaknya* sekali, artinya setiap catatan data dari pecahan diproses setidaknya satu kali oleh pekerja di konsumen Anda. Untuk informasi lebih lanjut tentang kasus di mana pecahan tertentu dapat diproses oleh lebih dari satu pekerja, lihat[Gunakan resharding, scaling, dan parallel processing untuk mengubah jumlah pecahan](kinesis-record-processor-scaling.md).

```
public void initialize(String shardId)
```

**processRecords**  
KCL memanggil `processRecords` metode, melewati daftar catatan data dari pecahan yang ditentukan oleh metode. `initialize(shardId)` Prosesor rekaman memproses data dalam catatan ini sesuai dengan semantik konsumen. Misalnya, pekerja mungkin melakukan transformasi pada data dan kemudian menyimpan hasilnya di bucket Amazon Simple Storage Service (Amazon S3).

```
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 
```

Selain data itu sendiri, catatan juga berisi nomor urut dan kunci partisi. Pekerja dapat menggunakan nilai-nilai ini saat memproses data. Misalnya, pekerja dapat memilih bucket S3 untuk menyimpan data berdasarkan nilai kunci partisi. `Record`Kelas mengekspos metode berikut yang menyediakan akses ke data catatan, nomor urut, dan kunci partisi. 

```
record.getData()  
record.getSequenceNumber() 
record.getPartitionKey()
```

Dalam sampel, metode privat `processRecordsWithRetries` memiliki kode yang menunjukkan bagaimana seorang pekerja dapat mengakses data rekaman, nomor urut, dan kunci partisi.

Kinesis Data Streams membutuhkan prosesor rekaman untuk melacak catatan yang telah diproses dalam pecahan. KCL menangani pelacakan ini untuk Anda dengan meneruskan checkpointer () `IRecordProcessorCheckpointer` ke. `processRecords` Prosesor rekaman memanggil `checkpoint` metode pada antarmuka ini untuk menginformasikan KCL tentang seberapa jauh perkembangannya dalam memproses catatan di pecahan. Jika pekerja gagal, KCL menggunakan informasi ini untuk memulai kembali pemrosesan pecahan pada catatan diproses terakhir yang diketahui.

Untuk operasi split atau merge, KCL tidak akan mulai memproses pecahan baru sampai prosesor untuk pecahan asli dipanggil `checkpoint` untuk memberi sinyal bahwa semua pemrosesan pada pecahan asli selesai.

Jika Anda tidak melewati parameter, KCL mengasumsikan bahwa panggilan ke `checkpoint` berarti bahwa semua catatan telah diproses, hingga catatan terakhir yang diteruskan ke prosesor rekaman. Oleh karena itu, prosesor rekaman harus memanggil `checkpoint` hanya setelah memproses semua catatan dalam daftar yang diteruskan ke sana. Prosesor rekaman tidak perlu memanggil `checkpoint` setiap panggilan ke`processRecords`. Prosesor dapat, misalnya, memanggil `checkpoint` setiap panggilan ketiga ke`processRecords`. Anda dapat secara opsional menentukan nomor urut yang tepat dari catatan sebagai parameter untuk`checkpoint`. Dalam hal ini, KCL mengasumsikan bahwa semua catatan telah diproses hingga catatan itu saja.

Dalam sampel, metode pribadi `checkpoint` menunjukkan cara memanggil `IRecordProcessorCheckpointer.checkpoint` menggunakan penanganan pengecualian yang sesuai dan logika coba lagi.

KCL bergantung pada `processRecords` untuk menangani pengecualian apa pun yang timbul dari pemrosesan catatan data. Jika pengecualian dilemparkan`processRecords`, KCL melompati catatan data yang diteruskan sebelum pengecualian. Artinya, catatan ini tidak dikirim kembali ke prosesor rekaman yang melemparkan pengecualian atau ke prosesor rekaman lainnya di konsumen.

**penonaktifan**  
KCL memanggil `shutdown` metode baik saat pemrosesan berakhir (alasan shutdown adalah`TERMINATE`) atau pekerja tidak lagi merespons (alasan shutdown adalah). `ZOMBIE`

```
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

Pemrosesan berakhir ketika prosesor rekaman tidak menerima catatan lebih lanjut dari pecahan, karena pecahan dipecah atau digabungkan, atau aliran dihapus.

KCL juga meneruskan `IRecordProcessorCheckpointer` antarmuka ke`shutdown`. Jika alasan shutdown adalah`TERMINATE`, prosesor rekaman harus menyelesaikan pemrosesan catatan data apa pun, dan kemudian memanggil `checkpoint` metode pada antarmuka ini.

### Antarmuka yang diperbarui (Versi 2)
<a name="kcl-java-interface-v2"></a>

`IRecordProcessor`Antarmuka (`package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`) yang diperbarui memperlihatkan metode prosesor rekaman berikut yang harus diterapkan konsumen Anda: 

```
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
```

Semua argumen dari versi asli antarmuka dapat diakses melalui metode get pada objek kontainer. Misalnya, untuk mengambil daftar catatan di`processRecords()`, Anda dapat menggunakan`processRecordsInput.getRecords()`.

Pada versi 2 antarmuka ini (KCL 1.5.0 dan yang lebih baru), input baru berikut tersedia selain input yang disediakan oleh antarmuka asli:

nomor urut awal  
Dalam `InitializationInput` objek yang diteruskan ke `initialize()` operasi, nomor urut awal dari mana catatan akan diberikan ke instance prosesor rekaman. Ini adalah nomor urut yang terakhir diperiksa oleh instance prosesor rekaman yang sebelumnya memproses pecahan yang sama. Ini disediakan jika aplikasi Anda membutuhkan informasi ini. 

nomor urut pos pemeriksaan yang tertunda  
Dalam `InitializationInput` objek yang diteruskan ke `initialize()` operasi, nomor urutan pos pemeriksaan yang tertunda (jika ada) yang tidak dapat dilakukan sebelum instance prosesor rekaman sebelumnya berhenti. 

## Menerapkan pabrik kelas untuk antarmuka IRecord Processor
<a name="kinesis-record-processor-implementation-factory-java"></a>

Anda juga perlu mengimplementasikan pabrik untuk kelas yang mengimplementasikan metode prosesor rekaman. Ketika konsumen Anda membuat instance pekerja, ia meneruskan referensi ke pabrik ini.

Sampel mengimplementasikan kelas pabrik dalam file `AmazonKinesisApplicationSampleRecordProcessorFactory.java` menggunakan antarmuka prosesor rekaman asli. Jika Anda ingin pabrik kelas membuat prosesor rekaman versi 2, gunakan nama paket`com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`.

```
  public class SampleRecordProcessorFactory implements IRecordProcessorFactory { 
      /**
      * Constructor.
      */
      public SampleRecordProcessorFactory() {
          super();
      }
      /**
      * {@inheritDoc}
      */
      @Override
      public IRecordProcessor createProcessor() {
          return new SampleRecordProcessor();
      }
  }
```

## Buat pekerja
<a name="kcl-java-worker"></a>

Seperti dibahas dalam[Menerapkan metode IRecord Processor](#kinesis-record-processor-implementation-interface-java), ada dua versi antarmuka prosesor rekaman KCL untuk dipilih, yang memengaruhi cara Anda membuat pekerja. Antarmuka prosesor rekaman asli menggunakan struktur kode berikut untuk membuat pekerja:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker(recordProcessorFactory, config);
```

Dengan versi 2 dari antarmuka prosesor rekaman, Anda dapat menggunakan `Worker.Builder` untuk membuat pekerja tanpa perlu khawatir tentang konstruktor mana yang akan digunakan dan urutan argumen. Antarmuka prosesor rekaman yang diperbarui menggunakan struktur kode berikut untuk membuat pekerja:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

## Ubah properti konfigurasi
<a name="kinesis-record-processor-initialization-java"></a>

Sampel memberikan nilai default untuk properti konfigurasi. Data konfigurasi untuk pekerja ini kemudian dikonsolidasikan dalam sebuah `KinesisClientLibConfiguration` objek. Objek ini dan referensi ke pabrik kelas untuk `IRecordProcessor` diteruskan dalam panggilan yang membuat instance pekerja. Anda dapat mengganti salah satu properti ini dengan nilai Anda sendiri menggunakan file properti Java (lihat`AmazonKinesisApplicationSample.java`).

### Nama aplikasi
<a name="configuration-property-application-name"></a>

KCL memerlukan nama aplikasi yang unik di seluruh aplikasi Anda, dan di seluruh tabel Amazon DynamoDB di Wilayah yang sama. Ini menggunakan nilai konfigurasi nama aplikasi dengan cara berikut:
+ Semua pekerja yang terkait dengan nama aplikasi ini diasumsikan bekerja sama pada aliran yang sama. Pekerja ini dapat didistribusikan pada beberapa contoh. Jika Anda menjalankan instance tambahan dari kode aplikasi yang sama, tetapi dengan nama aplikasi yang berbeda, KCL memperlakukan instance kedua sebagai aplikasi yang sepenuhnya terpisah yang juga beroperasi pada aliran yang sama.
+ KCL membuat tabel DynamoDB dengan nama aplikasi dan menggunakan tabel untuk mempertahankan informasi status (seperti pos pemeriksaan dan pemetaan pecahan pekerja) untuk aplikasi. Setiap aplikasi memiliki tabel DynamoDB sendiri. Untuk informasi selengkapnya, lihat [Gunakan tabel sewa untuk melacak pecahan yang diproses oleh aplikasi konsumen KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Siapkan kredensil
<a name="kinesis-record-processor-cred-java"></a>

Anda harus membuat AWS kredensil Anda tersedia untuk salah satu penyedia kredensi dalam rantai penyedia kredensi default. Misalnya, jika Anda menjalankan konsumen pada instans EC2, sebaiknya Anda meluncurkan instans dengan peran IAM. AWS kredensil yang mencerminkan izin yang terkait dengan peran IAM ini tersedia untuk aplikasi pada instance melalui metadata instance-nya. Ini adalah cara paling aman untuk mengelola kredensil bagi konsumen yang berjalan pada instans EC2.

Aplikasi sampel pertama kali mencoba untuk mengambil kredenal IAM dari metadata instance: 

```
credentialsProvider = new InstanceProfileCredentialsProvider(); 
```

Jika aplikasi sampel tidak dapat memperoleh kredensil dari metadata instance, aplikasi tersebut mencoba mengambil kredensil dari file properti:

```
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
```

*Untuk informasi selengkapnya tentang metadata instans, lihat [Metadata Instans di Panduan Pengguna](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html) Amazon EC2.*

### Gunakan ID pekerja untuk beberapa instance
<a name="kinesis-record-processor-workerid-java"></a>

Kode inisialisasi sampel membuat ID untuk pekerja`workerId`, menggunakan nama komputer lokal dan menambahkan pengidentifikasi unik global seperti yang ditunjukkan dalam cuplikan kode berikut. Pendekatan ini mendukung skenario beberapa contoh aplikasi konsumen yang berjalan pada satu komputer.

```
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
```

## Migrasi ke Versi 2 dari antarmuka prosesor rekaman
<a name="kcl-java-v2-migration"></a>

Jika Anda ingin memigrasikan kode yang menggunakan antarmuka asli, selain langkah-langkah yang dijelaskan sebelumnya, langkah-langkah berikut diperlukan:

1. Ubah kelas prosesor rekaman Anda untuk mengimpor antarmuka prosesor rekaman versi 2:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   ```

1. Ubah referensi ke input untuk menggunakan `get` metode pada objek kontainer. Misalnya, dalam `shutdown()` operasi, ubah "`checkpointer`" menjadi "`shutdownInput.getCheckpointer()`”.

1. Ubah kelas pabrik prosesor rekaman Anda untuk mengimpor antarmuka pabrik prosesor rekaman versi 2:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   ```

1. Ubah konstruksi pekerja yang akan digunakan`Worker.Builder`. Contoh:

   ```
   final Worker worker = new Worker.Builder()
       .recordProcessorFactory(recordProcessorFactory)
       .config(config)
       .build();
   ```

# Mengembangkan konsumen Perpustakaan Klien Kinesis di Node.js
<a name="kinesis-record-processor-implementation-app-nodejs"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Anda dapat menggunakan Kinesis Client Library (KCL) untuk membangun aplikasi yang memproses data dari aliran data Kinesis Anda. Perpustakaan Klien Kinesis tersedia dalam berbagai bahasa. Topik ini membahas Node.js.

KCL adalah perpustakaan Java; dukungan untuk bahasa selain Java disediakan menggunakan antarmuka multi-bahasa yang disebut. *MultiLangDaemon* Daemon ini berbasis Java dan berjalan di latar belakang saat Anda menggunakan bahasa KCL selain Java. Oleh karena itu, jika Anda menginstal KCL untuk Node.js dan menulis aplikasi konsumen Anda sepenuhnya di Node.js, Anda masih memerlukan Java diinstal pada sistem Anda karena. MultiLangDaemon Selanjutnya, MultiLangDaemon memiliki beberapa pengaturan default yang mungkin perlu Anda sesuaikan untuk kasus penggunaan Anda, misalnya, AWS Wilayah yang terhubung dengannya. Untuk informasi lebih lanjut tentang MultiLangDaemon on GitHub, buka halaman [ MultiLangDaemon proyek KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Untuk mengunduh Node.js KCL dari GitHub, buka [Perpustakaan Klien Kinesis (](https://github.com/awslabs/amazon-kinesis-client-nodejs)Node.js).

**Unduhan Kode Sampel**

Ada dua contoh kode yang tersedia untuk KCL di Node.js:
+ [sampel dasar](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/basic_sample)

  Digunakan di bagian berikut untuk menggambarkan dasar-dasar membangun aplikasi konsumen KCL di Node.js.
+ [click-stream-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/click_stream_sample)

   Sedikit lebih maju dan menggunakan skenario dunia nyata, setelah Anda membiasakan diri dengan kode sampel dasar. Sampel ini tidak dibahas di sini tetapi memiliki file README dengan informasi lebih lanjut.

Anda harus menyelesaikan tugas-tugas berikut saat menerapkan aplikasi konsumen KCL di Node.js:

**Topics**
+ [Implementasikan prosesor rekaman](#kinesis-record-processor-implementation-interface-nodejs)
+ [Ubah properti konfigurasi](#kinesis-record-processor-initialization-nodejs)

## Implementasikan prosesor rekaman
<a name="kinesis-record-processor-implementation-interface-nodejs"></a>

Konsumen paling sederhana yang mungkin menggunakan KCL untuk Node.js harus mengimplementasikan `recordProcessor` fungsi, yang pada gilirannya berisi fungsi`initialize`,`processRecords`, dan`shutdown`. Sampel menyediakan implementasi yang dapat Anda gunakan sebagai titik awal (lihat`sample_kcl_app.js`).

```
function recordProcessor() {
  // return an object that implements initialize, processRecords and shutdown functions.}
```

**menginisialisasi**  
KCL memanggil `initialize` fungsi ketika prosesor rekaman dimulai. Prosesor rekaman ini hanya memproses ID pecahan yang diteruskan sebagai`initializeInput.shardId`, dan biasanya, kebalikannya juga benar (pecahan ini hanya diproses oleh prosesor rekaman ini). Namun, konsumen Anda harus memperhitungkan kemungkinan bahwa catatan data dapat diproses lebih dari satu kali. Ini karena Kinesis Data *Streams memiliki setidaknya* sekali semantik, artinya setiap catatan data dari pecahan diproses setidaknya satu kali oleh pekerja di konsumen Anda. Untuk informasi lebih lanjut tentang kasus di mana pecahan tertentu dapat diproses oleh lebih dari satu pekerja, lihat[Gunakan resharding, scaling, dan parallel processing untuk mengubah jumlah pecahan](kinesis-record-processor-scaling.md).

```
initialize: function(initializeInput, completeCallback)
```

**processRecords**  
 KCL memanggil fungsi ini dengan input yang berisi daftar catatan data dari pecahan yang ditentukan ke fungsi tersebut`initialize`. Prosesor rekaman yang Anda terapkan memproses data dalam catatan ini sesuai dengan semantik konsumen Anda. Misalnya, pekerja mungkin melakukan transformasi pada data dan kemudian menyimpan hasilnya di bucket Amazon Simple Storage Service (Amazon S3). 

```
processRecords: function(processRecordsInput, completeCallback)
```

Selain data itu sendiri, catatan juga berisi nomor urut dan kunci partisi, yang dapat digunakan pekerja saat memproses data. Misalnya, pekerja dapat memilih bucket S3 untuk menyimpan data berdasarkan nilai kunci partisi. `record`Kamus mengekspos pasangan kunci-nilai berikut untuk mengakses data catatan, nomor urut, dan kunci partisi:

```
record.data
record.sequenceNumber
record.partitionKey
```

Perhatikan bahwa data tersebut dikodekan oleh Base64.

Dalam sampel dasar, fungsi `processRecords` memiliki kode yang menunjukkan bagaimana seorang pekerja dapat mengakses data rekaman, nomor urut, dan kunci partisi.

Kinesis Data Streams membutuhkan prosesor rekaman untuk melacak catatan yang telah diproses dalam pecahan. KCL menangani pelacakan ini dengan `checkpointer` objek yang dilewatkan sebagai`processRecordsInput.checkpointer`. Prosesor rekaman Anda memanggil `checkpointer.checkpoint` fungsi untuk memberi tahu KCL seberapa jauh perkembangannya dalam memproses catatan di pecahan. Jika pekerja gagal, KCL menggunakan informasi ini saat Anda memulai ulang pemrosesan pecahan sehingga berlanjut dari catatan olahan terakhir yang diketahui.

Untuk operasi split atau penggabungan, KCL tidak mulai memproses pecahan baru sampai prosesor untuk pecahan asli dipanggil `checkpoint` untuk memberi sinyal bahwa semua pemrosesan pada pecahan asli selesai.

Jika Anda tidak meneruskan nomor urut ke `checkpoint` fungsi, KCL mengasumsikan bahwa panggilan ke `checkpoint` berarti bahwa semua catatan telah diproses, hingga catatan terakhir yang diteruskan ke prosesor rekaman. Oleh karena itu, prosesor rekaman harus memanggil `checkpoint` **hanya** setelah memproses semua catatan dalam daftar yang diteruskan ke sana. Prosesor rekaman tidak perlu memanggil `checkpoint` setiap panggilan ke`processRecords`. Prosesor dapat, misalnya, memanggil setiap panggilan ketiga, atau beberapa peristiwa `checkpoint` di luar prosesor rekaman Anda, seperti verification/validation layanan khusus yang telah Anda terapkan. 

Anda dapat secara opsional menentukan nomor urut yang tepat dari catatan sebagai parameter untuk`checkpoint`. Dalam hal ini, KCL mengasumsikan bahwa semua catatan telah diproses hingga catatan itu saja.

Aplikasi sampel dasar menunjukkan panggilan sesederhana mungkin ke `checkpointer.checkpoint` fungsi tersebut. Anda dapat menambahkan logika checkpointing lain yang Anda butuhkan untuk konsumen Anda pada titik ini dalam fungsi.

**penonaktifan**  
KCL memanggil `shutdown` fungsi baik saat pemrosesan berakhir (`shutdownInput.reason`is`TERMINATE`) atau pekerja tidak lagi merespons (`shutdownInput.reason`is`ZOMBIE`).

```
shutdown: function(shutdownInput, completeCallback)
```

Pemrosesan berakhir ketika prosesor rekaman tidak menerima catatan lebih lanjut dari pecahan, karena pecahan dipecah atau digabungkan, atau aliran dihapus.

KCL juga meneruskan `shutdownInput.checkpointer` objek ke`shutdown`. Jika alasan shutdown adalah`TERMINATE`, Anda harus memastikan bahwa prosesor rekaman telah selesai memproses catatan data apa pun, dan kemudian memanggil `checkpoint` fungsi pada antarmuka ini.

## Ubah properti konfigurasi
<a name="kinesis-record-processor-initialization-nodejs"></a>

Sampel memberikan nilai default untuk properti konfigurasi. Anda dapat mengganti salah satu properti ini dengan nilai Anda sendiri (lihat `sample.properties` di sampel dasar).

### Nama aplikasi
<a name="kinesis-record-processor-application-name-nodejs"></a>

KCL memerlukan aplikasi yang unik di antara aplikasi Anda, dan di antara tabel Amazon DynamoDB di Wilayah yang sama. Ini menggunakan nilai konfigurasi nama aplikasi dengan cara berikut:
+ Semua pekerja yang terkait dengan nama aplikasi ini diasumsikan bekerja sama pada aliran yang sama. Pekerja ini dapat didistribusikan pada beberapa contoh. Jika Anda menjalankan instance tambahan dari kode aplikasi yang sama, tetapi dengan nama aplikasi yang berbeda, KCL memperlakukan instance kedua sebagai aplikasi yang sepenuhnya terpisah yang juga beroperasi pada aliran yang sama.
+ KCL membuat tabel DynamoDB dengan nama aplikasi dan menggunakan tabel untuk mempertahankan informasi status (seperti pos pemeriksaan dan pemetaan pecahan pekerja) untuk aplikasi. Setiap aplikasi memiliki tabel DynamoDB sendiri. Untuk informasi selengkapnya, lihat [Gunakan tabel sewa untuk melacak pecahan yang diproses oleh aplikasi konsumen KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Siapkan kredensil
<a name="kinesis-record-processor-credentials-nodejs"></a>

Anda harus membuat AWS kredensil Anda tersedia untuk salah satu penyedia kredensi dalam rantai penyedia kredensi default. Anda dapat menggunakan `AWSCredentialsProvider` properti untuk menetapkan penyedia kredensial. `sample.properties`File harus membuat kredensyal Anda tersedia untuk salah satu penyedia kredensyal dalam rantai penyedia kredensi [default](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Jika Anda menjalankan konsumen di instans Amazon EC2, sebaiknya Anda mengonfigurasi instans dengan peran IAM. AWS kredensil yang mencerminkan izin yang terkait dengan peran IAM ini tersedia untuk aplikasi pada instance melalui metadata instance-nya. Ini adalah cara paling aman untuk mengelola kredensil untuk aplikasi konsumen yang berjalan pada instans EC2.

Contoh berikut mengkonfigurasi KCL untuk memproses aliran data Kinesis bernama `kclnodejssample` menggunakan prosesor rekaman yang disediakan di: `sample_kcl_app.js`

```
# The Node.js executable script
executableName = node sample_kcl_app.js
# The name of an Amazon Kinesis stream to process
streamName = kclnodejssample
# Unique KCL application name
applicationName = kclnodejssample
# Use default AWS credentials provider chain
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Read from the beginning of the stream
initialPositionInStream = TRIM_HORIZON
```

# Mengembangkan konsumen Perpustakaan Klien Kinesis di .NET
<a name="kinesis-record-processor-implementation-app-dotnet"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Anda dapat menggunakan Kinesis Client Library (KCL) untuk membangun aplikasi yang memproses data dari aliran data Kinesis Anda. Perpustakaan Klien Kinesis tersedia dalam berbagai bahasa. Topik ini membahas .NET.

KCL adalah perpustakaan Java; dukungan untuk bahasa selain Java disediakan menggunakan antarmuka multi-bahasa yang disebut. *MultiLangDaemon* Daemon ini berbasis Java dan berjalan di latar belakang saat Anda menggunakan bahasa KCL selain Java. Oleh karena itu, jika Anda menginstal KCL untuk.NET dan menulis aplikasi konsumen Anda sepenuhnya di .NET, Anda masih perlu Java diinstal pada sistem Anda karena itu MultiLangDaemon. Selanjutnya, MultiLangDaemon memiliki beberapa pengaturan default yang mungkin perlu Anda sesuaikan untuk kasus penggunaan Anda, misalnya, AWS Wilayah yang terhubung dengannya. Untuk informasi lebih lanjut tentang MultiLangDaemon on GitHub, buka halaman [ MultiLangDaemon proyek KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Untuk mengunduh .NET KCL dari GitHub, buka [Perpustakaan Klien Kinesis (](https://github.com/awslabs/amazon-kinesis-client-net).NET). Untuk mengunduh kode sampel untuk aplikasi konsumen.NET KCL, buka halaman [proyek konsumen sampel KCL untuk .NET](https://github.com/awslabs/amazon-kinesis-client-net/tree/master/SampleConsumer) di. GitHub

Anda harus menyelesaikan tugas-tugas berikut saat menerapkan aplikasi konsumen KCL di .NET:

**Topics**
+ [Menerapkan metode kelas IRecord Processor](#kinesis-record-processor-implementation-interface-dotnet)
+ [Ubah properti konfigurasi](#kinesis-record-processor-initialization-dotnet)

## Menerapkan metode kelas IRecord Processor
<a name="kinesis-record-processor-implementation-interface-dotnet"></a>

Konsumen harus menerapkan metode berikut untuk`IRecordProcessor`. Konsumen sampel menyediakan implementasi yang dapat Anda gunakan sebagai titik awal (lihat `SampleRecordProcessor` kelas di`SampleConsumer/AmazonKinesisSampleConsumer.cs`).

```
public void Initialize(InitializationInput input)
public void ProcessRecords(ProcessRecordsInput input)
public void Shutdown(ShutdownInput input)
```

**Inisialisasi**  
KCL memanggil metode ini ketika prosesor rekaman dipakai, melewati ID pecahan tertentu dalam parameter (). `input` `input.ShardId` Prosesor rekaman ini hanya memproses pecahan ini, dan biasanya, kebalikannya juga benar (pecahan ini hanya diproses oleh prosesor rekaman ini). Namun, konsumen Anda harus memperhitungkan kemungkinan bahwa catatan data dapat diproses lebih dari satu kali. Ini karena Kinesis Data *Streams memiliki setidaknya* sekali semantik, artinya setiap catatan data dari pecahan diproses setidaknya satu kali oleh pekerja di konsumen Anda. Untuk informasi lebih lanjut tentang kasus di mana pecahan tertentu dapat diproses oleh lebih dari satu pekerja, lihat[Gunakan resharding, scaling, dan parallel processing untuk mengubah jumlah pecahan](kinesis-record-processor-scaling.md).

```
public void Initialize(InitializationInput input)
```

**ProcessRecords**  
KCL memanggil metode ini, melewati daftar catatan data dalam `input` parameter (`input.Records`) dari pecahan yang ditentukan oleh metode. `Initialize` Prosesor rekaman yang Anda terapkan memproses data dalam catatan ini sesuai dengan semantik konsumen Anda. Misalnya, pekerja mungkin melakukan transformasi pada data dan kemudian menyimpan hasilnya di bucket Amazon Simple Storage Service (Amazon S3).

```
public void ProcessRecords(ProcessRecordsInput input)
```

Selain data itu sendiri, catatan juga berisi nomor urut dan kunci partisi. Pekerja dapat menggunakan nilai-nilai ini saat memproses data. Misalnya, pekerja dapat memilih bucket S3 untuk menyimpan data berdasarkan nilai kunci partisi. `Record`Kelas mengekspos berikut ini untuk mengakses data catatan, nomor urut, dan kunci partisi:

```
byte[] Record.Data 
string Record.SequenceNumber
string Record.PartitionKey
```

Dalam sampel, metode ini `ProcessRecordsWithRetries` memiliki kode yang menunjukkan bagaimana seorang pekerja dapat mengakses data rekaman, nomor urut, dan kunci partisi.

Kinesis Data Streams membutuhkan prosesor rekaman untuk melacak catatan yang telah diproses dalam pecahan. KCL menangani pelacakan ini untuk Anda dengan meneruskan `Checkpointer` objek ke `ProcessRecords` (`input.Checkpointer`). Prosesor rekaman memanggil `Checkpointer.Checkpoint` metode untuk menginformasikan KCL tentang seberapa jauh perkembangannya dalam memproses catatan di pecahan. Jika pekerja gagal, KCL menggunakan informasi ini untuk memulai kembali pemrosesan pecahan pada catatan diproses terakhir yang diketahui.

Untuk operasi split atau penggabungan, KCL tidak mulai memproses pecahan baru sampai prosesor untuk pecahan asli dipanggil `Checkpointer.Checkpoint` untuk memberi sinyal bahwa semua pemrosesan pada pecahan asli selesai.

Jika Anda tidak melewati parameter, KCL mengasumsikan bahwa panggilan untuk `Checkpointer.Checkpoint` menandakan bahwa semua catatan telah diproses, hingga catatan terakhir yang diteruskan ke prosesor rekaman. Oleh karena itu, prosesor rekaman harus memanggil `Checkpointer.Checkpoint` hanya setelah memproses semua catatan dalam daftar yang diteruskan ke sana. Prosesor rekaman tidak perlu memanggil `Checkpointer.Checkpoint` setiap panggilan ke`ProcessRecords`. Prosesor dapat, misalnya, memanggil `Checkpointer.Checkpoint` setiap panggilan ketiga atau keempat. Anda dapat secara opsional menentukan nomor urut yang tepat dari catatan sebagai parameter untuk`Checkpointer.Checkpoint`. Dalam hal ini, KCL mengasumsikan bahwa catatan telah diproses hanya hingga catatan itu.

Dalam sampel, metode pribadi `Checkpoint(Checkpointer checkpointer)` menunjukkan cara memanggil `Checkpointer.Checkpoint` metode menggunakan penanganan pengecualian yang sesuai dan logika coba lagi.

KCL untuk.NET menangani pengecualian secara berbeda dari pustaka bahasa KCL lainnya karena tidak menangani pengecualian apa pun yang muncul dari pemrosesan catatan data. Setiap pengecualian yang tidak tertangkap dari kode pengguna akan merusak program.

**Matikan**  
KCL memanggil `Shutdown` metode baik saat pemrosesan berakhir (alasan shutdown adalah`TERMINATE`) atau pekerja tidak lagi merespons (nilai shutdown `input.Reason` adalah). `ZOMBIE`

```
public void Shutdown(ShutdownInput input)
```

Pemrosesan berakhir ketika prosesor rekaman tidak menerima catatan lebih lanjut dari pecahan, karena pecahan dipecah atau digabungkan, atau aliran dihapus.

KCL juga meneruskan `Checkpointer` objek ke`shutdown`. Jika alasan shutdown adalah`TERMINATE`, prosesor rekaman harus menyelesaikan pemrosesan catatan data apa pun, dan kemudian memanggil `checkpoint` metode pada antarmuka ini.

## Ubah properti konfigurasi
<a name="kinesis-record-processor-initialization-dotnet"></a>

Konsumen sampel memberikan nilai default untuk properti konfigurasi. Anda dapat mengganti salah satu properti ini dengan nilai Anda sendiri (lihat`SampleConsumer/kcl.properties`).

### Nama aplikasi
<a name="modify-kinesis-record-processor-application-name"></a>

KCL memerlukan aplikasi yang unik di antara aplikasi Anda, dan di antara tabel Amazon DynamoDB di Wilayah yang sama. Ini menggunakan nilai konfigurasi nama aplikasi dengan cara berikut:
+ Semua pekerja yang terkait dengan nama aplikasi ini diasumsikan bekerja sama pada aliran yang sama. Pekerja ini dapat didistribusikan pada beberapa contoh. Jika Anda menjalankan instance tambahan dari kode aplikasi yang sama, tetapi dengan nama aplikasi yang berbeda, KCL memperlakukan instance kedua sebagai aplikasi yang sepenuhnya terpisah yang juga beroperasi pada aliran yang sama.
+ KCL membuat tabel DynamoDB dengan nama aplikasi dan menggunakan tabel untuk mempertahankan informasi status (seperti pos pemeriksaan dan pemetaan pecahan pekerja) untuk aplikasi. Setiap aplikasi memiliki tabel DynamoDB sendiri. Untuk informasi selengkapnya, lihat [Gunakan tabel sewa untuk melacak pecahan yang diproses oleh aplikasi konsumen KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Siapkan kredensil
<a name="kinesis-record-processor-creds-dotnet"></a>

Anda harus membuat AWS kredensil Anda tersedia untuk salah satu penyedia kredensi dalam rantai penyedia kredensi default. Anda dapat menggunakan `AWSCredentialsProvider` properti untuk menetapkan penyedia kredensial. [Sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) [harus membuat kredensil Anda tersedia untuk salah satu penyedia kredensional dalam rantai penyedia kredensi default.](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html) Jika Anda menjalankan aplikasi konsumen pada instans EC2, sebaiknya Anda mengonfigurasi instans dengan peran IAM. AWS kredensil yang mencerminkan izin yang terkait dengan peran IAM ini tersedia untuk aplikasi pada instance melalui metadata instance-nya. Ini adalah cara paling aman untuk mengelola kredensil bagi konsumen yang berjalan pada instans EC2.

File properti sampel mengonfigurasi KCL untuk memproses aliran data Kinesis yang disebut “kata-kata” menggunakan prosesor rekaman yang disertakan. `AmazonKinesisSampleConsumer.cs` 

# Kembangkan konsumen Perpustakaan Klien Kinesis dengan Python
<a name="kinesis-record-processor-implementation-app-py"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Anda dapat menggunakan Kinesis Client Library (KCL) untuk membangun aplikasi yang memproses data dari aliran data Kinesis Anda. Perpustakaan Klien Kinesis tersedia dalam berbagai bahasa. Topik ini membahas Python.

KCL adalah perpustakaan Java; dukungan untuk bahasa selain Java disediakan menggunakan antarmuka multi-bahasa yang disebut. *MultiLangDaemon* Daemon ini berbasis Java dan berjalan di latar belakang saat Anda menggunakan bahasa KCL selain Java. Oleh karena itu, jika Anda menginstal KCL untuk Python dan menulis aplikasi konsumen Anda sepenuhnya dengan Python, Anda masih memerlukan Java diinstal pada sistem Anda karena itu. MultiLangDaemon Selanjutnya, MultiLangDaemon memiliki beberapa pengaturan default yang mungkin perlu Anda sesuaikan untuk kasus penggunaan Anda, misalnya, AWS Wilayah yang terhubung dengannya. Untuk informasi lebih lanjut tentang MultiLangDaemon on GitHub, buka halaman [ MultiLangDaemon proyek KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Untuk mengunduh Python KCL dari GitHub, pergi ke [Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client-python) (Python). Untuk mengunduh kode sampel untuk aplikasi konsumen Python KCL, buka halaman proyek sampel [KCL untuk Python](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples). GitHub

Anda harus menyelesaikan tugas-tugas berikut saat menerapkan aplikasi konsumen KCL dengan Python:

**Topics**
+ [Menerapkan metode RecordProcessor kelas](#kinesis-record-processor-implementation-interface-py)
+ [Ubah properti konfigurasi](#kinesis-record-processor-initialization-py)

## Menerapkan metode RecordProcessor kelas
<a name="kinesis-record-processor-implementation-interface-py"></a>

`RecordProcess`Kelas harus memperluas `RecordProcessorBase` untuk mengimplementasikan metode berikut. Sampel menyediakan implementasi yang dapat Anda gunakan sebagai titik awal (lihat`sample_kclpy_app.py`).

```
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
```

**menginisialisasi**  
KCL memanggil `initialize` metode ketika prosesor rekaman dipakai, melewati ID pecahan tertentu sebagai parameter. Prosesor rekaman ini hanya memproses pecahan ini, dan biasanya, kebalikannya juga benar (pecahan ini hanya diproses oleh prosesor rekaman ini). Namun, konsumen Anda harus memperhitungkan kemungkinan bahwa catatan data dapat diproses lebih dari satu kali. Ini karena Kinesis Data *Streams memiliki setidaknya* sekali semantik, artinya setiap catatan data dari pecahan diproses setidaknya satu kali oleh pekerja di konsumen Anda. Untuk informasi lebih lanjut tentang kasus di mana pecahan tertentu dapat diproses oleh lebih dari satu pekerja, lihat[Gunakan resharding, scaling, dan parallel processing untuk mengubah jumlah pecahan](kinesis-record-processor-scaling.md).

```
def initialize(self, shard_id)
```

**process\$1records**  
 KCL memanggil metode ini, melewati daftar catatan data dari pecahan yang ditentukan oleh metode. `initialize` Prosesor rekaman yang Anda terapkan memproses data dalam catatan ini sesuai dengan semantik konsumen Anda. Misalnya, pekerja mungkin melakukan transformasi pada data dan kemudian menyimpan hasilnya di bucket Amazon Simple Storage Service (Amazon S3).

```
def process_records(self, records, checkpointer) 
```

Selain data itu sendiri, catatan juga berisi nomor urut dan kunci partisi. Pekerja dapat menggunakan nilai-nilai ini saat memproses data. Misalnya, pekerja dapat memilih bucket S3 untuk menyimpan data berdasarkan nilai kunci partisi. `record`Kamus mengekspos pasangan kunci-nilai berikut untuk mengakses data catatan, nomor urut, dan kunci partisi:

```
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
```

Perhatikan bahwa data tersebut dikodekan oleh Base64.

Dalam sampel, metode ini `process_records` memiliki kode yang menunjukkan bagaimana seorang pekerja dapat mengakses data rekaman, nomor urut, dan kunci partisi.

Kinesis Data Streams membutuhkan prosesor rekaman untuk melacak catatan yang telah diproses dalam pecahan. KCL menangani pelacakan ini untuk Anda dengan mengirimkan `Checkpointer` objek ke`process_records`. Prosesor rekaman memanggil `checkpoint` metode pada objek ini untuk menginformasikan KCL tentang seberapa jauh perkembangannya dalam memproses catatan di pecahan. Jika pekerja gagal, KCL menggunakan informasi ini untuk memulai kembali pemrosesan pecahan pada catatan diproses terakhir yang diketahui.

Untuk operasi split atau penggabungan, KCL tidak mulai memproses pecahan baru sampai prosesor untuk pecahan asli dipanggil `checkpoint` untuk memberi sinyal bahwa semua pemrosesan pada pecahan asli selesai.

Jika Anda tidak melewati parameter, KCL mengasumsikan bahwa panggilan ke `checkpoint` berarti bahwa semua catatan telah diproses, hingga catatan terakhir yang diteruskan ke prosesor rekaman. Oleh karena itu, prosesor rekaman harus memanggil `checkpoint` hanya setelah memproses semua catatan dalam daftar yang diteruskan ke sana. Prosesor rekaman tidak perlu memanggil `checkpoint` setiap panggilan ke`process_records`. Prosesor dapat, misalnya, memanggil `checkpoint` setiap panggilan ketiga. Anda dapat secara opsional menentukan nomor urut yang tepat dari catatan sebagai parameter untuk`checkpoint`. Dalam hal ini, KCL mengasumsikan bahwa semua catatan telah diproses hingga catatan itu saja.

Dalam sampel, metode pribadi `checkpoint` menunjukkan cara memanggil `Checkpointer.checkpoint` metode menggunakan penanganan pengecualian yang sesuai dan logika coba lagi.

KCL bergantung pada `process_records` untuk menangani pengecualian apa pun yang timbul dari pemrosesan catatan data. Jika pengecualian dilemparkan`process_records`, KCL melompati catatan data yang diteruskan `process_records` sebelum pengecualian. Artinya, catatan ini tidak dikirim kembali ke prosesor rekaman yang melemparkan pengecualian atau ke prosesor rekaman lainnya di konsumen.

**penonaktifan**  
 KCL memanggil `shutdown` metode baik saat pemrosesan berakhir (alasan shutdown adalah`TERMINATE`) atau pekerja tidak lagi merespons (shutdown `reason` adalah). `ZOMBIE`

```
def shutdown(self, checkpointer, reason)
```

Pemrosesan berakhir ketika prosesor rekaman tidak menerima catatan lebih lanjut dari pecahan, karena pecahan dipecah atau digabungkan, atau aliran dihapus.

 KCL juga meneruskan `Checkpointer` objek ke`shutdown`. Jika shutdown `reason``TERMINATE`, prosesor rekaman harus menyelesaikan pemrosesan catatan data apa pun, dan kemudian memanggil `checkpoint` metode pada antarmuka ini.

## Ubah properti konfigurasi
<a name="kinesis-record-processor-initialization-py"></a>

Sampel memberikan nilai default untuk properti konfigurasi. Anda dapat mengganti salah satu properti ini dengan nilai Anda sendiri (lihat`sample.properties`).

### Nama aplikasi
<a name="kinesis-record-processor-application-name-py"></a>

KCL memerlukan nama aplikasi yang unik di antara aplikasi Anda, dan di antara tabel Amazon DynamoDB di Wilayah yang sama. Ini menggunakan nilai konfigurasi nama aplikasi dengan cara berikut:
+ Semua pekerja yang terkait dengan nama aplikasi ini diasumsikan bekerja sama pada aliran yang sama. Pekerja ini dapat didistribusikan pada beberapa contoh. Jika Anda menjalankan instance tambahan dari kode aplikasi yang sama, tetapi dengan nama aplikasi yang berbeda, KCL memperlakukan instance kedua sebagai aplikasi yang sepenuhnya terpisah yang juga beroperasi pada aliran yang sama.
+ KCL membuat tabel DynamoDB dengan nama aplikasi dan menggunakan tabel untuk mempertahankan informasi status (seperti pos pemeriksaan dan pemetaan pecahan pekerja) untuk aplikasi. Setiap aplikasi memiliki tabel DynamoDB sendiri. Untuk informasi selengkapnya, lihat [Gunakan tabel sewa untuk melacak pecahan yang diproses oleh aplikasi konsumen KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Siapkan kredensil
<a name="kinesis-record-processor-creds-py"></a>

Anda harus membuat AWS kredensil Anda tersedia untuk salah satu penyedia kredensi dalam rantai penyedia kredensi default. Anda dapat menggunakan `AWSCredentialsProvider` properti untuk menetapkan penyedia kredensial. [Sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) [harus membuat kredensil Anda tersedia untuk salah satu penyedia kredensional dalam rantai penyedia kredensi default.](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html) Jika Anda menjalankan aplikasi konsumen di instans Amazon EC2, sebaiknya Anda mengonfigurasi instans dengan peran IAM. AWS kredensil yang mencerminkan izin yang terkait dengan peran IAM ini tersedia untuk aplikasi pada instance melalui metadata instance-nya. Ini adalah cara paling aman untuk mengelola kredensil untuk aplikasi konsumen yang berjalan pada instans EC2.

File properti sampel mengonfigurasi KCL untuk memproses aliran data Kinesis yang disebut “kata-kata” menggunakan prosesor rekaman yang disertakan. `sample_kclpy_app.py` 

# Mengembangkan Konsumen Perpustakaan Klien Kinesis di Ruby
<a name="kinesis-record-processor-implementation-app-ruby"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Anda dapat menggunakan Kinesis Client Library (KCL) untuk membangun aplikasi yang memproses data dari aliran data Kinesis Anda. Perpustakaan Klien Kinesis tersedia dalam berbagai bahasa. Topik ini membahas Ruby.

KCL adalah perpustakaan Java; dukungan untuk bahasa selain Java disediakan menggunakan antarmuka multi-bahasa yang disebut. *MultiLangDaemon* Daemon ini berbasis Java dan berjalan di latar belakang saat Anda menggunakan bahasa KCL selain Java. Oleh karena itu, jika Anda menginstal KCL untuk Ruby dan menulis aplikasi konsumen Anda sepenuhnya di Ruby, Anda masih memerlukan Java diinstal pada sistem Anda karena itu. MultiLangDaemon Selanjutnya, MultiLangDaemon memiliki beberapa pengaturan default yang mungkin perlu Anda sesuaikan untuk kasus penggunaan Anda, misalnya, AWS Wilayah yang terhubung dengannya. Untuk informasi lebih lanjut tentang MultiLangDaemon on GitHub, buka halaman [ MultiLangDaemon proyek KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Untuk mengunduh Ruby KCL dari GitHub, buka [Perpustakaan Klien Kinesis](https://github.com/awslabs/amazon-kinesis-client-ruby) (Ruby). Untuk mengunduh kode sampel untuk aplikasi konsumen Ruby KCL, buka halaman proyek [sampel KCL untuk Ruby](https://github.com/awslabs/amazon-kinesis-client-ruby/tree/master/samples) di. GitHub

Untuk informasi selengkapnya tentang pustaka dukungan KCL Ruby, lihat Dokumentasi Permata [Ruby KCL](http://www.rubydoc.info/gems/aws-kclrb).

# Kembangkan Konsumen KCL 2.x
<a name="developing-consumers-with-kcl-v2"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Topik ini menunjukkan cara menggunakan versi 2.0 dari Kinesis Client Library (KCL). 

Untuk informasi lebih lanjut tentang KCL, lihat ikhtisar yang disediakan dalam [Mengembangkan Konsumen Menggunakan Perpustakaan Klien Kinesis 1.x](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html).

Pilih dari topik berikut tergantung pada opsi yang ingin Anda gunakan.

**Topics**
+ [Mengembangkan Konsumen Perpustakaan Klien Kinesis di Jawa](kcl2-standard-consumer-java-example.md)
+ [Kembangkan konsumen Perpustakaan Klien Kinesis dengan Python](kcl2-standard-consumer-python-example.md)
+ [Kembangkan konsumen fan-out yang disempurnakan dengan KCL 2.x](building-enhanced-consumers-kcl-retired.md)

# Mengembangkan Konsumen Perpustakaan Klien Kinesis di Jawa
<a name="kcl2-standard-consumer-java-example"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Kode berikut menunjukkan contoh implementasi di Java dari `ProcessorFactory` dan`RecordProcessor`. Jika Anda ingin memanfaatkan fitur fan-out yang disempurnakan, lihat [Menggunakan Konsumen dengan Enhanced](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html) Fan-Out.

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License.
 */


/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

/**
 * This class will run a simple app that uses the KCL to read data and uses the AWS SDK to publish data.
 * Before running this program you must first create a Kinesis stream through the AWS console or AWS SDK.
 */
public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    /**
     * Invoke the main method with 2 args: the stream name and (optionally) the region.
     * Verifies valid inputs and then starts running the app.
     */
    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    /**
     * Constructor sets streamName and region. It also creates a KinesisClient object to send data to Kinesis.
     * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used
     * indirectly by the KCL to handle the consumption of the data.
     */
    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {

        /**
         * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL
         */
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        /**
         * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
         * ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private
         * class below.
         */
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        /**
         * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
         * instance is configured with defaults provided by the ConfigsBuilder.
         */
        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );

        /**
         * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
         * until an exit is triggered.
         */
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        /**
         * Allows termination of app by pressing Enter.
         */
        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        /**
         * Stops sending dummy data.
         */
        log.info("Cancelling producer and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        /**
         * Stops consuming data. Finishes processing the current batch of data already received from Kinesis
         * before shutting down.
         */
        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown.  Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    /**
     * Sends a single record of dummy data to Kinesis.
     */
    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }

    /**
     * The implementation of the ShardRecordProcessor interface is where the heart of the record processing logic lives.
     * In this example all we do to 'process' is log info about the records.
     */
    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        /**
         * Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
         * processRecords). In this example we do nothing except some logging.
         *
         * @param initializationInput Provides information related to initialization.
         */
        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
         * data records to the application. In this example we simply log our records.
         *
         * @param processRecordsInput Provides the records to be processed as well as information and capabilities
         *                            related to them (e.g. checkpointing).
         */
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /** Called when the lease tied to this record processor has been lost. Once the lease has been lost,
         * the record processor can no longer checkpoint.
         *
         * @param leaseLostInput Provides access to functions and data related to the loss of the lease.
         */
        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Called when all data on this shard has been processed. Checkpointing must occur in the method for record
         * processing to be considered complete; an exception will be thrown otherwise.
         *
         * @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
         */
        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
         * Enter). Checkpoints and logs the data a final time.
         *
         * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
         *                               before the shutdown is completed.
         */
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# Kembangkan konsumen Perpustakaan Klien Kinesis dengan Python
<a name="kcl2-standard-consumer-python-example"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Anda dapat menggunakan Kinesis Client Library (KCL) untuk membangun aplikasi yang memproses data dari aliran data Kinesis Anda. Perpustakaan Klien Kinesis tersedia dalam berbagai bahasa. Topik ini membahas Python.

KCL adalah perpustakaan Java; dukungan untuk bahasa selain Java disediakan menggunakan antarmuka multi-bahasa yang disebut. *MultiLangDaemon* Daemon ini berbasis Java dan berjalan di latar belakang saat Anda menggunakan bahasa KCL selain Java. Oleh karena itu, jika Anda menginstal KCL untuk Python dan menulis aplikasi konsumen Anda sepenuhnya dengan Python, Anda masih memerlukan Java diinstal pada sistem Anda karena itu. MultiLangDaemon Selanjutnya, MultiLangDaemon memiliki beberapa pengaturan default yang mungkin perlu Anda sesuaikan untuk kasus penggunaan Anda, misalnya, AWS Wilayah yang terhubung dengannya. Untuk informasi lebih lanjut tentang MultiLangDaemon on GitHub, buka halaman [ MultiLangDaemon proyek KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Untuk mengunduh Python KCL dari GitHub, pergi ke [Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client-python) (Python). Untuk mengunduh kode sampel untuk aplikasi konsumen Python KCL, buka halaman proyek sampel [KCL untuk Python](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples). GitHub

Anda harus menyelesaikan tugas-tugas berikut saat menerapkan aplikasi konsumen KCL dengan Python:

**Topics**
+ [Menerapkan metode RecordProcessor kelas](#kinesis-record-processor-implementation-interface-py)
+ [Ubah properti konfigurasi](#kinesis-record-processor-initialization-py)

## Menerapkan metode RecordProcessor kelas
<a name="kinesis-record-processor-implementation-interface-py"></a>

`RecordProcess`Kelas harus memperluas `RecordProcessorBase` kelas untuk menerapkan metode berikut:

```
initialize
process_records
shutdown_requested
```

Contoh ini menyediakan implementasi yang dapat Anda gunakan sebagai titik awal.

```
#!/usr/bin/env python

# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

from __future__ import print_function

import sys
import time

from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor


class RecordProcessor(processor.RecordProcessorBase):
    """
    A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:

    * initialize will be called once
    * process_records will be called zero or more times
    * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
        a scaling change.
    """
    def __init__(self):
        self._SLEEP_SECONDS = 5
        self._CHECKPOINT_RETRIES = 5
        self._CHECKPOINT_FREQ_SECONDS = 60
        self._largest_seq = (None, None)
        self._largest_sub_seq = None
        self._last_checkpoint_time = None

    def log(self, message):
        sys.stderr.write(message)

    def initialize(self, initialize_input):
        """
        Called once by a KCLProcess before any calls to process_records

        :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record
            processor has been assigned.
        """
        self._largest_seq = (None, None)
        self._last_checkpoint_time = time.time()

    def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None):
        """
        Checkpoints with retries on retryable exceptions.

        :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records
            or shutdown
        :param str or None sequence_number: the sequence number to checkpoint at.
        :param int or None sub_sequence_number: the sub sequence number to checkpoint at.
        """
        for n in range(0, self._CHECKPOINT_RETRIES):
            try:
                checkpointer.checkpoint(sequence_number, sub_sequence_number)
                return
            except kcl.CheckpointError as e:
                if 'ShutdownException' == e.value:
                    #
                    # A ShutdownException indicates that this record processor should be shutdown. This is due to
                    # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
                    #
                    print('Encountered shutdown exception, skipping checkpoint')
                    return
                elif 'ThrottlingException' == e.value:
                    #
                    # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
                    # dynamo writes. We will sleep temporarily to let it recover.
                    #
                    if self._CHECKPOINT_RETRIES - 1 == n:
                        sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
                        return
                    else:
                        print('Was throttled while checkpointing, will attempt again in {s} seconds'
                              .format(s=self._SLEEP_SECONDS))
                elif 'InvalidStateException' == e.value:
                    sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
                else:  # Some other error
                    sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
            time.sleep(self._SLEEP_SECONDS)

    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        """
        Called for each record that is passed to process_records.

        :param str data: The blob of data that was contained in the record.
        :param str partition_key: The key associated with this recod.
        :param int sequence_number: The sequence number associated with this record.
        :param int sub_sequence_number: the sub sequence number associated with this record.
        """
        ####################################
        # Insert your processing logic here
        ####################################
        self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
                 .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))

    def should_update_sequence(self, sequence_number, sub_sequence_number):
        """
        Determines whether a new larger sequence number is available

        :param int sequence_number: the sequence number from the current record
        :param int sub_sequence_number: the sub sequence number from the current record
        :return boolean: true if the largest sequence should be updated, false otherwise
        """
        return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \
            (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1])

    def process_records(self, process_records_input):
        """
        Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
        from the records to indicate where in the stream to checkpoint.

        :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
            records.
        """
        try:
            for record in process_records_input.records:
                data = record.binary_data
                seq = int(record.sequence_number)
                sub_seq = record.sub_sequence_number
                key = record.partition_key
                self.process_record(data, key, seq, sub_seq)
                if self.should_update_sequence(seq, sub_seq):
                    self._largest_seq = (seq, sub_seq)

            #
            # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds
            #
            if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
                self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
                self._last_checkpoint_time = time.time()

        except Exception as e:
            self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))

    def lease_lost(self, lease_lost_input):
        self.log("Lease has been lost")

    def shard_ended(self, shard_ended_input):
        self.log("Shard has ended checkpointing")
        shard_ended_input.checkpointer.checkpoint()

    def shutdown_requested(self, shutdown_requested_input):
        self.log("Shutdown has been requested, checkpointing.")
        shutdown_requested_input.checkpointer.checkpoint()


if __name__ == "__main__":
    kcl_process = kcl.KCLProcess(RecordProcessor())
    kcl_process.run()
```

## Ubah properti konfigurasi
<a name="kinesis-record-processor-initialization-py"></a>

Sampel memberikan nilai default untuk properti konfigurasi, seperti yang ditunjukkan pada skrip berikut. Anda dapat mengganti salah satu properti ini dengan nilai Anda sendiri.

```
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py

# The name of an Amazon Kinesis stream to process.
streamName = words

# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = PythonKCLSample

# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = python/2.7

# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON

# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.

# The KCL defaults to us-east-1
#regionName = us-east-1

# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000

# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId = 

# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000

# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000

# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000

# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false

# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000

# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true

# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500

# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000

# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000

# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true

# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0
```

### Nama aplikasi
<a name="kinesis-record-processor-application-name-py"></a>

KCL memerlukan nama aplikasi yang unik di antara aplikasi Anda dan di antara tabel Amazon DynamoDB di Wilayah yang sama. Ini menggunakan nilai konfigurasi nama aplikasi dengan cara berikut:
+ Semua pekerja yang terkait dengan nama aplikasi ini diasumsikan bekerja sama pada aliran yang sama. Pekerja ini dapat didistribusikan di beberapa instance. Jika Anda menjalankan instance tambahan dari kode aplikasi yang sama, tetapi dengan nama aplikasi yang berbeda, KCL memperlakukan instance kedua sebagai aplikasi yang sepenuhnya terpisah yang juga beroperasi pada aliran yang sama.
+ KCL membuat tabel DynamoDB dengan nama aplikasi dan menggunakan tabel untuk mempertahankan informasi status (seperti pos pemeriksaan dan pemetaan pecahan pekerja) untuk aplikasi. Setiap aplikasi memiliki tabel DynamoDB sendiri. Untuk informasi selengkapnya, lihat [Gunakan tabel sewa untuk melacak pecahan yang diproses oleh aplikasi konsumen KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Kredensial
<a name="kinesis-record-processor-creds-py"></a>

Anda harus membuat AWS kredensil Anda tersedia untuk salah satu penyedia kredensi dalam rantai penyedia [kredensi default](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Anda dapat menggunakan `AWSCredentialsProvider` properti untuk menetapkan penyedia kredensial. Jika Anda menjalankan aplikasi konsumen di instans Amazon EC2, sebaiknya Anda mengonfigurasi instans dengan peran IAM. AWS kredensil yang mencerminkan izin yang terkait dengan peran IAM ini tersedia untuk aplikasi pada instance melalui metadata instance-nya. Ini adalah cara paling aman untuk mengelola kredensil untuk aplikasi konsumen yang berjalan pada instans EC2.

# Kembangkan konsumen fan-out yang disempurnakan dengan KCL 2.x
<a name="building-enhanced-consumers-kcl-retired"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Konsumen yang menggunakan *fan-out yang disempurnakan* di Amazon Kinesis Data Streams dapat menerima catatan dari aliran data dengan throughput khusus hingga 2 MB data per detik per pecahan. Konsumen jenis ini tidak harus bersaing dengan konsumen lain yang menerima data dari aliran. Untuk informasi selengkapnya, lihat [Kembangkan konsumen fan-out yang ditingkatkan dengan throughput khusus](enhanced-consumers.md).

Anda dapat menggunakan versi 2.0 atau yang lebih baru dari Kinesis Client Library (KCL) untuk mengembangkan aplikasi yang menggunakan fan-out yang disempurnakan untuk menerima data dari aliran. KCL secara otomatis berlangganan aplikasi Anda ke semua pecahan aliran, dan memastikan bahwa aplikasi konsumen Anda dapat membaca dengan nilai throughput 2 per pecahan. MB/sec Jika Anda ingin menggunakan KCL tanpa mengaktifkan fan-out yang disempurnakan, lihat [Mengembangkan Konsumen Menggunakan Kinesis Client](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl-v2.html) Library 2.0.

**Topics**
+ [Kembangkan konsumen fan-out yang disempurnakan menggunakan KCL 2.x di Jawa](building-enhanced-consumers-kcl-java.md)

# Kembangkan konsumen fan-out yang disempurnakan menggunakan KCL 2.x di Jawa
<a name="building-enhanced-consumers-kcl-java"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Anda dapat menggunakan Kinesis Client Library (KCL) versi 2.0 atau yang lebih baru untuk mengembangkan aplikasi di Amazon Kinesis Data Streams untuk menerima data dari stream menggunakan fan-out yang disempurnakan. Kode berikut menunjukkan contoh implementasi di Java dari `ProcessorFactory` dan`RecordProcessor`.

Disarankan agar Anda menggunakan `KinesisClientUtil` untuk membuat `KinesisAsyncClient` dan mengkonfigurasi `maxConcurrency``KinesisAsyncClient`.

**penting**  
Klien Amazon Kinesis mungkin melihat peningkatan latensi yang signifikan, kecuali jika Anda mengonfigurasi `KinesisAsyncClient` untuk memiliki cukup `maxConcurrency` tinggi untuk memungkinkan semua sewa ditambah penggunaan tambahan. `KinesisAsyncClient`

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License. 
 */

/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        log.info("Cancelling producer, and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }


    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# Migrasikan konsumen dari KCL 1.x ke KCL 2.x
<a name="kcl-migration"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Topik ini menjelaskan perbedaan antara versi 1.x dan 2.x dari Kinesis Client Library (KCL). Ini juga menunjukkan kepada Anda cara memigrasikan konsumen Anda dari versi 1.x ke versi 2.x dari KCL. Setelah Anda memigrasikan klien Anda, klien akan mulai memproses catatan dari lokasi terakhir yang diperiksa.

Versi 2.0 dari KCL memperkenalkan perubahan antarmuka berikut:


**Perubahan Antarmuka KCL**  

| Antarmuka KCL 1.x | Antarmuka KCL 2.0 | 
| --- | --- | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor | software.amazon.kinesis.processor.ShardRecordProcessor | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory | software.amazon.kinesis.processor.ShardRecordProcessorFactory | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware | Dilipat menjadi software.amazon.kinesis.processor.ShardRecordProcessor | 

**Topics**
+ [Migrasikan prosesor rekaman](#recrod-processor-migration)
+ [Migrasikan pabrik prosesor rekaman](#recrod-processor-factory-migration)
+ [Migrasi pekerja](#worker-migration)
+ [Konfigurasikan klien Amazon Kinesis](#client-configuration)
+ [Penghapusan waktu idle](#idle-time-removal)
+ [Penghapusan konfigurasi klien](#client-configuration-removals)

## Migrasikan prosesor rekaman
<a name="recrod-processor-migration"></a>

Contoh berikut menunjukkan prosesor rekaman diimplementasikan untuk KCL 1.x:

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        //
        // Process records, and possibly checkpoint
        //
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**Untuk memigrasikan kelas prosesor rekaman**

1. Ubah antarmuka dari `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` dan `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` ke`software.amazon.kinesis.processor.ShardRecordProcessor`, sebagai berikut:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
   public class TestRecordProcessor implements ShardRecordProcessor {
   ```

1. Perbarui `import` pernyataan untuk `initialize` dan `processRecords` metode.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
   ```

1. Ganti `shutdown` metode dengan metode baru berikut:`leaseLost`,`shardEnded`, dan`shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

Berikut ini adalah versi terbaru dari kelas prosesor rekaman.

```
package com.amazonaws.kcl;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class TestRecordProcessor implements ShardRecordProcessor {
    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

## Migrasikan pabrik prosesor rekaman
<a name="recrod-processor-factory-migration"></a>

Pabrik prosesor rekaman bertanggung jawab untuk membuat prosesor rekaman ketika sewa diperoleh. Berikut ini adalah contoh pabrik KCL 1.x.

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class TestRecordProcessorFactory implements IRecordProcessorFactory {
    @Override
    public IRecordProcessor createProcessor() {
        return new TestRecordProcessor();
    }
}
```

**Untuk memigrasi pabrik prosesor rekaman**

1. Ubah antarmuka yang diimplementasikan dari `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` ke`software.amazon.kinesis.processor.ShardRecordProcessorFactory`, sebagai berikut.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
   public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
   ```

1. Ubah tanda tangan kembali untuk`createProcessor`.

   ```
   // public IRecordProcessor createProcessor() {
   public ShardRecordProcessor shardRecordProcessor() {
   ```

Berikut ini adalah contoh pabrik prosesor rekaman di 2.0:

```
package com.amazonaws.kcl;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new TestRecordProcessor();
    }
}
```

## Migrasi pekerja
<a name="worker-migration"></a>

Dalam versi 2.0 dari KCL, kelas baru, yang disebut`Scheduler`, menggantikan kelas. `Worker` Berikut ini adalah contoh pekerja KCL 1.x.

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

**Untuk memigrasikan pekerja**

1. Ubah `import` pernyataan untuk `Worker` kelas ke pernyataan impor untuk `Scheduler` dan `ConfigsBuilder` kelas.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Buat `ConfigsBuilder` dan a `Scheduler` seperti yang ditunjukkan pada contoh berikut.

   Disarankan agar Anda menggunakan `KinesisClientUtil` untuk membuat `KinesisAsyncClient` dan mengkonfigurasi `maxConcurrency``KinesisAsyncClient`.
**penting**  
Klien Amazon Kinesis mungkin melihat peningkatan latensi yang signifikan, kecuali jika Anda mengonfigurasi `KinesisAsyncClient` untuk memiliki cukup `maxConcurrency` tinggi untuk memungkinkan semua sewa ditambah penggunaan tambahan. `KinesisAsyncClient`

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
   Region region = Region.AP_NORTHEAST_2;
   KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
   ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());
   
   Scheduler scheduler = new Scheduler(
       configsBuilder.checkpointConfig(),
       configsBuilder.coordinatorConfig(),
       configsBuilder.leaseManagementConfig(),
       configsBuilder.lifecycleConfig(),
       configsBuilder.metricsConfig(),
       configsBuilder.processorConfig(),
       configsBuilder.retrievalConfig()
       );
   ```

## Konfigurasikan klien Amazon Kinesis
<a name="client-configuration"></a>

Dengan rilis 2.0 dari Kinesis Client Library, konfigurasi klien dipindahkan dari kelas konfigurasi tunggal (`KinesisClientLibConfiguration`) ke enam kelas konfigurasi. Tabel berikut menjelaskan migrasi.


**Bidang Konfigurasi dan Kelas Baru Mereka**  

| Bidang Asli | Kelas Konfigurasi Baru | Deskripsi | 
| --- | --- | --- | 
| applicationName | ConfigsBuilder | Nama untuk ini aplikasi KCL. Digunakan sebagai default untuk tableName danconsumerName. | 
| tableName | ConfigsBuilder | Memungkinkan penggantian nama tabel yang digunakan untuk tabel sewa Amazon DynamoDB. | 
| streamName | ConfigsBuilder | Nama aliran tempat aplikasi ini memproses catatan dari. | 
| kinesisEndpoint | ConfigsBuilder | Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. | 
| dynamoDBEndpoint | ConfigsBuilder | Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. | 
| initialPositionInStreamExtended | RetrievalConfig | Lokasi di pecahan tempat KCL mulai mengambil catatan, dimulai dengan proses awal aplikasi. | 
| kinesisCredentialsProvider | ConfigsBuilder | Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. | 
| dynamoDBCredentialsProvider | ConfigsBuilder | Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. | 
| cloudWatchCredentialsProvider | ConfigsBuilder | Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. | 
| failoverTimeMillis | LeaseManagementConfig | Jumlah milidetik yang harus dilewati sebelum Anda dapat mempertimbangkan pemilik sewa telah gagal. | 
| workerIdentifier | ConfigsBuilder | Pengenal unik yang mewakili instantiasi prosesor aplikasi ini. Ini pasti unik. | 
| shardSyncIntervalMillis | LeaseManagementConfig | Waktu antara panggilan sinkronisasi shard. | 
| maxRecords | PollingConfig | Memungkinkan pengaturan jumlah maksimum catatan yang Kinesis kembalikan. | 
| idleTimeBetweenReadsInMillis | CoordinatorConfig | Opsi ini telah dihapus. Lihat Penghapusan Waktu Idle. | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig | Ketika ditetapkan, prosesor rekaman dipanggil bahkan ketika tidak ada catatan yang diberikan dari Kinesis. | 
| parentShardPollIntervalMillis | CoordinatorConfig | Seberapa sering prosesor rekaman harus melakukan polling untuk melihat apakah pecahan induk telah selesai. | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig | Ketika ditetapkan, sewa dihapus segera setelah sewa anak mulai diproses. | 
| ignoreUnexpectedChildShards | LeaseManagementConfig | Saat diatur, pecahan anak yang memiliki pecahan terbuka diabaikan. Ini terutama untuk DynamoDB Streams. | 
| kinesisClientConfig | ConfigsBuilder | Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. | 
| dynamoDBClientConfig | ConfigsBuilder | Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. | 
| cloudWatchClientConfig | ConfigsBuilder | Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. | 
| taskBackoffTimeMillis | LifecycleConfig | Waktu untuk menunggu untuk mencoba kembali tugas yang gagal. | 
| metricsBufferTimeMillis | MetricsConfig | Mengontrol penerbitan CloudWatch metrik. | 
| metricsMaxQueueSize | MetricsConfig | Mengontrol penerbitan CloudWatch metrik. | 
| metricsLevel | MetricsConfig | Mengontrol penerbitan CloudWatch metrik. | 
| metricsEnabledDimensions | MetricsConfig | Mengontrol penerbitan CloudWatch metrik. | 
| validateSequenceNumberBeforeCheckpointing | CheckpointConfig | Opsi ini telah dihapus. Lihat Validasi Nomor Urutan Pos Pemeriksaan. | 
| regionName | ConfigsBuilder | Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. | 
| maxLeasesForWorker | LeaseManagementConfig | Jumlah maksimum sewa satu contoh aplikasi harus diterima. | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig | Jumlah maksimum sewa aplikasi harus mencoba untuk mencuri pada satu waktu. | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | DynamoDB IOPs membaca yang digunakan jika Kinesis Client Library perlu membuat tabel sewa DynamoDB baru. | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | DynamoDB IOPs membaca yang digunakan jika Kinesis Client Library perlu membuat tabel sewa DynamoDB baru. | 
| initialPositionInStreamExtended | LeaseManagementConfig | Posisi awal dalam aliran tempat aplikasi harus dimulai. Ini hanya digunakan selama pembuatan sewa awal. | 
| skipShardSyncAtWorkerInitializationIfLeasesExist | CoordinatorConfig | Nonaktifkan sinkronisasi data pecahan jika tabel sewa berisi sewa yang ada. TODO: KinesisEco -438 | 
| shardPrioritization | CoordinatorConfig | Prioritas pecahan mana yang akan digunakan. | 
| shutdownGraceMillis | N/A | Opsi ini telah dihapus. Lihat MultiLang Penghapusan. | 
| timeoutInSeconds | N/A | Opsi ini telah dihapus. Lihat MultiLang Penghapusan. | 
| retryGetRecordsInSeconds | PollingConfig | Mengkonfigurasi penundaan antara GetRecords upaya untuk kegagalan. | 
| maxGetRecordsThreadPool | PollingConfig | Ukuran kolam benang yang digunakan untuk GetRecords. | 
| maxLeaseRenewalThreads | LeaseManagementConfig | Mengontrol ukuran kumpulan utas penyewa penyewaan. Semakin banyak sewa yang dapat diambil aplikasi Anda, semakin besar kumpulan ini seharusnya. | 
| recordsFetcherFactory | PollingConfig | Memungkinkan penggantian pabrik yang digunakan untuk membuat fetcher yang mengambil dari aliran. | 
| logWarningForTaskAfterMillis | LifecycleConfig | Berapa lama menunggu sebelum peringatan dicatat jika tugas belum selesai. | 
| listShardsBackoffTimeInMillis | RetrievalConfig | Jumlah milidetik untuk menunggu di antara panggilan ke ListShards saat kegagalan terjadi. | 
| maxListShardsRetryAttempts | RetrievalConfig | Jumlah maksimum kali yang ListShards mencoba lagi sebelum menyerah. | 

## Penghapusan waktu idle
<a name="idle-time-removal"></a>

Dalam versi 1.x dari KCL, yang `idleTimeBetweenReadsInMillis` berhubungan dengan dua kuantitas: 
+ Jumlah waktu antara pemeriksaan pengiriman tugas. Anda sekarang dapat mengonfigurasi waktu ini di antara tugas dengan mengatur`CoordinatorConfig#shardConsumerDispatchPollIntervalMillis`.
+ Jumlah waktu untuk tidur ketika tidak ada catatan yang dikembalikan dari Kinesis Data Streams. Dalam versi 2.0, catatan fan-out yang disempurnakan didorong dari retriever masing-masing. Aktivitas pada konsumen shard hanya terjadi ketika permintaan yang didorong tiba. 

## Penghapusan konfigurasi klien
<a name="client-configuration-removals"></a>

Di versi 2.0, KCL tidak lagi menciptakan klien. Itu tergantung pada pengguna untuk menyediakan klien yang valid. Dengan perubahan ini, semua parameter konfigurasi yang mengontrol pembuatan klien telah dihapus. Jika Anda membutuhkan parameter ini, Anda dapat mengaturnya pada klien sebelum memberikan klien`ConfigsBuilder`.


****  

| Bidang yang Dihapus | Konfigurasi Setara | 
| --- | --- | 
| kinesisEndpoint | Konfigurasikan SDK KinesisAsyncClient dengan titik akhir pilihan:. KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build() | 
| dynamoDBEndpoint | Konfigurasikan SDK DynamoDbAsyncClient dengan titik akhir pilihan:. DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build() | 
| kinesisClientConfig | Konfigurasikan SDK KinesisAsyncClient dengan konfigurasi yang diperlukan:KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| dynamoDBClientConfig | Konfigurasikan SDK DynamoDbAsyncClient dengan konfigurasi yang diperlukan:DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| cloudWatchClientConfig | Konfigurasikan SDK CloudWatchAsyncClient dengan konfigurasi yang diperlukan:CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| regionName | Konfigurasikan SDK dengan Wilayah pilihan. Ini sama untuk semua klien SDK. Misalnya, KinesisAsyncClient.builder().region(Region.US\$1WEST\$12).build(). | 