Pilih preferensi cookie Anda

Kami menggunakan cookie penting serta alat serupa yang diperlukan untuk menyediakan situs dan layanan. Kami menggunakan cookie performa untuk mengumpulkan statistik anonim sehingga kami dapat memahami cara pelanggan menggunakan situs dan melakukan perbaikan. Cookie penting tidak dapat dinonaktifkan, tetapi Anda dapat mengklik “Kustom” atau “Tolak” untuk menolak cookie performa.

Jika Anda setuju, AWS dan pihak ketiga yang disetujui juga akan menggunakan cookie untuk menyediakan fitur situs yang berguna, mengingat preferensi Anda, dan menampilkan konten yang relevan, termasuk iklan yang relevan. Untuk menerima atau menolak semua cookie yang tidak penting, klik “Terima” atau “Tolak”. Untuk membuat pilihan yang lebih detail, klik “Kustomisasi”.

Migrasikan konsumen dari KCL 1.x ke KCL 2.x

Mode fokus
Migrasikan konsumen dari KCL 1.x ke KCL 2.x - Amazon Kinesis Data Streams

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

penting

Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami sangat menyarankan Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman Perpustakaan Klien Amazon Kinesis di. GitHub Untuk informasi tentang versi KCL terbaru, lihatGunakan Perpustakaan Klien Kinesis. Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. Migrasi dari KCL 1.x ke KCL 3.x

Topik ini menjelaskan perbedaan antara versi 1.x dan 2.x dari Kinesis Client Library (KCL). Ini juga menunjukkan kepada Anda cara memigrasikan konsumen Anda dari versi 1.x ke versi 2.x dari KCL. Setelah Anda memigrasikan klien Anda, klien akan mulai memproses catatan dari lokasi terakhir yang diperiksa.

Versi 2.0 dari KCL memperkenalkan perubahan antarmuka berikut:

Perubahan Antarmuka KCL
Antarmuka KCL 1.x Antarmuka KCL 2.0
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor software.amazon.kinesis.processor.ShardRecordProcessor
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory software.amazon.kinesis.processor.ShardRecordProcessorFactory
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware Dilipat menjadi software.amazon.kinesis.processor.ShardRecordProcessor

Migrasikan prosesor rekaman

Contoh berikut menunjukkan prosesor rekaman diimplementasikan untuk KCL 1.x:

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { // // Process records, and possibly checkpoint // } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Untuk memigrasikan kelas prosesor rekaman
  1. Ubah antarmuka dari com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor dan com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware kesoftware.amazon.kinesis.processor.ShardRecordProcessor, sebagai berikut:

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.ShardRecordProcessor; // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { public class TestRecordProcessor implements ShardRecordProcessor {
  2. Perbarui import pernyataan untuk initialize dan processRecords metode.

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
  3. Ganti shutdown metode dengan metode baru berikut:leaseLost,shardEnded, danshutdownRequested.

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

Berikut ini adalah versi terbaru dari kelas prosesor rekaman.

package com.amazonaws.kcl; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; public class TestRecordProcessor implements ShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }

Migrasikan pabrik prosesor rekaman

Pabrik prosesor rekaman bertanggung jawab untuk membuat prosesor rekaman ketika sewa diperoleh. Berikut ini adalah contoh pabrik KCL 1.x.

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class TestRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new TestRecordProcessor(); } }
Untuk memigrasi pabrik prosesor rekaman
  1. Ubah antarmuka yang diimplementasikan dari com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory kesoftware.amazon.kinesis.processor.ShardRecordProcessorFactory, sebagai berikut.

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
  2. Ubah tanda tangan kembali untukcreateProcessor.

    // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

Berikut ini adalah contoh pabrik prosesor rekaman di 2.0:

package com.amazonaws.kcl; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new TestRecordProcessor(); } }

Migrasi pekerja

Dalam versi 2.0 dari KCL, kelas baru, yang disebutScheduler, menggantikan kelas. Worker Berikut ini adalah contoh pekerja KCL 1.x.

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();
Untuk memigrasikan pekerja
  1. Ubah import pernyataan untuk Worker kelas ke pernyataan impor untuk Scheduler dan ConfigsBuilder kelas.

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. Buat ConfigsBuilder dan a Scheduler seperti yang ditunjukkan pada contoh berikut.

    Disarankan agar Anda menggunakan KinesisClientUtil untuk membuat KinesisAsyncClient dan mengkonfigurasi maxConcurrencyKinesisAsyncClient.

    penting

    Klien Amazon Kinesis mungkin melihat peningkatan latensi yang signifikan, kecuali jika Anda mengonfigurasi KinesisAsyncClient untuk memiliki cukup maxConcurrency tinggi untuk memungkinkan semua sewa ditambah penggunaan tambahan. KinesisAsyncClient

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... Region region = Region.AP_NORTHEAST_2; KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region)); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );

Konfigurasikan klien Amazon Kinesis

Dengan rilis 2.0 dari Kinesis Client Library, konfigurasi klien dipindahkan dari kelas konfigurasi tunggal (KinesisClientLibConfiguration) ke enam kelas konfigurasi. Tabel berikut menjelaskan migrasi.

Bidang Konfigurasi dan Kelas Baru
Bidang Asli Kelas Konfigurasi Baru Deskripsi
applicationName ConfigsBuilder Nama untuk ini aplikasi KCL. Digunakan sebagai default untuk tableName danconsumerName.
tableName ConfigsBuilder Memungkinkan penggantian nama tabel yang digunakan untuk tabel sewa Amazon DynamoDB.
streamName ConfigsBuilder Nama aliran tempat aplikasi ini memproses catatan dari.
kinesisEndpoint ConfigsBuilder Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien.
dynamoDBEndpoint ConfigsBuilder Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien.
initialPositionInStreamExtended RetrievalConfig Lokasi di pecahan tempat KCL mulai mengambil catatan, dimulai dengan proses awal aplikasi.
kinesisCredentialsProvider ConfigsBuilder Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien.
dynamoDBCredentialsProvider ConfigsBuilder Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien.
cloudWatchCredentialsProvider ConfigsBuilder Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien.
failoverTimeMillis LeaseManagementConfig Jumlah milidetik yang harus dilewati sebelum Anda dapat mempertimbangkan pemilik sewa telah gagal.
workerIdentifier ConfigsBuilder Pengidentifikasi unik yang mewakili instantiasi prosesor aplikasi ini. Ini pasti unik.
shardSyncIntervalMillis LeaseManagementConfig Waktu antara panggilan sinkronisasi shard.
maxRecords PollingConfig Memungkinkan pengaturan jumlah maksimum catatan yang Kinesis kembalikan.
idleTimeBetweenReadsInMillis CoordinatorConfig Opsi ini telah dihapus. Lihat Penghapusan Waktu Idle.
callProcessRecordsEvenForEmptyRecordList ProcessorConfig Ketika ditetapkan, prosesor rekaman dipanggil bahkan ketika tidak ada catatan yang diberikan dari Kinesis.
parentShardPollIntervalMillis CoordinatorConfig Seberapa sering prosesor rekaman harus melakukan polling untuk melihat apakah pecahan induk telah selesai.
cleanupLeasesUponShardCompletion LeaseManagementConfig Ketika ditetapkan, sewa dihapus segera setelah sewa anak mulai diproses.
ignoreUnexpectedChildShards LeaseManagementConfig Saat diatur, pecahan anak yang memiliki pecahan terbuka diabaikan. Ini terutama untuk DynamoDB Streams.
kinesisClientConfig ConfigsBuilder Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien.
dynamoDBClientConfig ConfigsBuilder Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien.
cloudWatchClientConfig ConfigsBuilder Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien.
taskBackoffTimeMillis LifecycleConfig Waktu untuk menunggu untuk mencoba kembali tugas yang gagal.
metricsBufferTimeMillis MetricsConfig Mengontrol penerbitan CloudWatch metrik.
metricsMaxQueueSize MetricsConfig Mengontrol penerbitan CloudWatch metrik.
metricsLevel MetricsConfig Mengontrol penerbitan CloudWatch metrik.
metricsEnabledDimensions MetricsConfig Mengontrol penerbitan CloudWatch metrik.
validateSequenceNumberBeforeCheckpointing CheckpointConfig Opsi ini telah dihapus. Lihat Validasi Nomor Urutan Pos Pemeriksaan.
regionName ConfigsBuilder Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien.
maxLeasesForWorker LeaseManagementConfig Jumlah maksimum sewa satu contoh aplikasi harus diterima.
maxLeasesToStealAtOneTime LeaseManagementConfig Jumlah maksimum sewa aplikasi harus mencoba untuk mencuri pada satu waktu.
initialLeaseTableReadCapacity LeaseManagementConfig DynamoDB IOPs membaca yang digunakan jika Kinesis Client Library perlu membuat tabel sewa DynamoDB baru.
initialLeaseTableWriteCapacity LeaseManagementConfig DynamoDB IOPs membaca yang digunakan jika Kinesis Client Library perlu membuat tabel sewa DynamoDB baru.
initialPositionInStreamExtended LeaseManagementConfig Posisi awal dalam aliran tempat aplikasi harus dimulai. Ini hanya digunakan selama pembuatan sewa awal.
skipShardSyncAtWorkerInitializationIfLeasesExist CoordinatorConfig Nonaktifkan sinkronisasi data pecahan jika tabel sewa berisi sewa yang ada. TODO: KinesisEco -438
shardPrioritization CoordinatorConfig Prioritas pecahan mana yang akan digunakan.
shutdownGraceMillis N/A Opsi ini telah dihapus. Lihat MultiLang Penghapusan.
timeoutInSeconds N/A Opsi ini telah dihapus. Lihat MultiLang Penghapusan.
retryGetRecordsInSeconds PollingConfig Mengkonfigurasi penundaan antara GetRecords upaya untuk kegagalan.
maxGetRecordsThreadPool PollingConfig Ukuran kolam benang yang digunakan untuk GetRecords.
maxLeaseRenewalThreads LeaseManagementConfig Mengontrol ukuran kumpulan utas penyewa penyewaan. Semakin banyak sewa yang dapat diambil aplikasi Anda, semakin besar kumpulan ini seharusnya.
recordsFetcherFactory PollingConfig Memungkinkan penggantian pabrik yang digunakan untuk membuat fetcher yang mengambil dari aliran.
logWarningForTaskAfterMillis LifecycleConfig Berapa lama menunggu sebelum peringatan dicatat jika tugas belum selesai.
listShardsBackoffTimeInMillis RetrievalConfig Jumlah milidetik untuk menunggu di antara panggilan ListShards ketika kegagalan terjadi.
maxListShardsRetryAttempts RetrievalConfig Jumlah maksimum kali yang ListShards mencoba lagi sebelum menyerah.

Penghapusan waktu idle

Dalam versi 1.x dari KCL, yang idleTimeBetweenReadsInMillis berhubungan dengan dua kuantitas:

  • Jumlah waktu antara pemeriksaan pengiriman tugas. Anda sekarang dapat mengonfigurasi waktu ini di antara tugas dengan mengaturCoordinatorConfig#shardConsumerDispatchPollIntervalMillis.

  • Jumlah waktu untuk tidur ketika tidak ada catatan yang dikembalikan dari Kinesis Data Streams. Dalam versi 2.0, catatan fan-out yang disempurnakan didorong dari retriever masing-masing. Aktivitas pada konsumen shard hanya terjadi ketika permintaan yang didorong tiba.

Penghapusan konfigurasi klien

Di versi 2.0, KCL tidak lagi menciptakan klien. Itu tergantung pada pengguna untuk menyediakan klien yang valid. Dengan perubahan ini, semua parameter konfigurasi yang mengontrol pembuatan klien telah dihapus. Jika Anda membutuhkan parameter ini, Anda dapat mengaturnya pada klien sebelum memberikan klienConfigsBuilder.

Bidang yang Dihapus Konfigurasi Setara
kinesisEndpoint Konfigurasikan SDK KinesisAsyncClient dengan titik akhir pilihan:. KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build()
dynamoDBEndpoint Konfigurasikan SDK DynamoDbAsyncClient dengan titik akhir pilihan:. DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build()
kinesisClientConfig Konfigurasikan SDK KinesisAsyncClient dengan konfigurasi yang diperlukan:KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build().
dynamoDBClientConfig Konfigurasikan SDK DynamoDbAsyncClient dengan konfigurasi yang diperlukan:DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build().
cloudWatchClientConfig Konfigurasikan SDK CloudWatchAsyncClient dengan konfigurasi yang diperlukan:CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build().
regionName Konfigurasikan SDK dengan Region pilihan. Ini sama untuk semua klien SDK. Misalnya, KinesisAsyncClient.builder().region(Region.US_WEST_2).build().
PrivasiSyarat situsPreferensi cookie
© 2025, Amazon Web Services, Inc. atau afiliasinya. Semua hak dilindungi undang-undang.