Migrasikan konsumen dari KCL 1.x ke 2.x KCL - Amazon Kinesis Data Streams

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Migrasikan konsumen dari KCL 1.x ke 2.x KCL

catatan

Kinesis Client Library (KCL) versi 1.x dan 2.x sudah usang. Kami merekomendasikan migrasi ke KCLversi 3.x, yang menawarkan peningkatan kinerja dan fitur baru. Untuk KCL dokumentasi terbaru dan panduan migrasi, lihatGunakan Perpustakaan Klien Kinesis.

Topik ini menjelaskan perbedaan antara versi 1.x dan 2.x dari Kinesis Client Library (). KCL Ini juga menunjukkan kepada Anda cara memigrasikan konsumen Anda dari versi 1.x ke versi 2.x. KCL Setelah Anda memigrasikan klien Anda, klien akan mulai memproses catatan dari lokasi terakhir yang diperiksa.

Versi 2.0 dari KCL memperkenalkan perubahan antarmuka berikut:

KCLPerubahan Antarmuka
KCLAntarmuka 1.x KCL2.0 Antarmuka
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor software.amazon.kinesis.processor.ShardRecordProcessor
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory software.amazon.kinesis.processor.ShardRecordProcessorFactory
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware Dilipat menjadi software.amazon.kinesis.processor.ShardRecordProcessor

Migrasikan prosesor rekaman

Contoh berikut menunjukkan prosesor rekaman diimplementasikan untuk KCL 1.x:

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { // // Process records, and possibly checkpoint // } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Untuk memigrasikan kelas prosesor rekaman
  1. Ubah antarmuka dari com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor dan com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware kesoftware.amazon.kinesis.processor.ShardRecordProcessor, sebagai berikut:

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.ShardRecordProcessor; // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { public class TestRecordProcessor implements ShardRecordProcessor {
  2. Perbarui import pernyataan untuk initialize dan processRecords metode.

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
  3. Ganti shutdown metode dengan metode baru berikut:leaseLost,shardEnded, danshutdownRequested.

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

Berikut ini adalah versi terbaru dari kelas prosesor rekaman.

package com.amazonaws.kcl; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; public class TestRecordProcessor implements ShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }

Migrasikan pabrik prosesor rekaman

Pabrik prosesor rekaman bertanggung jawab untuk membuat prosesor rekaman ketika sewa diperoleh. Berikut ini adalah contoh pabrik KCL 1.x.

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class TestRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new TestRecordProcessor(); } }
Untuk memigrasi pabrik prosesor rekaman
  1. Ubah antarmuka yang diimplementasikan dari com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory kesoftware.amazon.kinesis.processor.ShardRecordProcessorFactory, sebagai berikut.

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
  2. Ubah tanda tangan kembali untukcreateProcessor.

    // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

Berikut ini adalah contoh pabrik prosesor rekaman di 2.0:

package com.amazonaws.kcl; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new TestRecordProcessor(); } }

Migrasi pekerja

Dalam versi 2.0 dariKCL, kelas baru, disebutScheduler, menggantikan Worker kelas. Berikut ini adalah contoh pekerja KCL 1.x.

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();
Untuk memigrasikan pekerja
  1. Ubah import pernyataan untuk Worker kelas ke pernyataan impor untuk Scheduler dan ConfigsBuilder kelas.

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. Buat ConfigsBuilder dan a Scheduler seperti yang ditunjukkan pada contoh berikut.

    Disarankan agar Anda menggunakan KinesisClientUtil untuk membuat KinesisAsyncClient dan mengkonfigurasi maxConcurrencyKinesisAsyncClient.

    penting

    Klien Amazon Kinesis mungkin melihat peningkatan latensi yang signifikan, kecuali jika Anda mengonfigurasi KinesisAsyncClient untuk memiliki cukup maxConcurrency tinggi untuk memungkinkan semua sewa ditambah penggunaan tambahan. KinesisAsyncClient

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... Region region = Region.AP_NORTHEAST_2; KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region)); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );

Konfigurasikan klien Amazon Kinesis

Dengan rilis 2.0 dari Kinesis Client Library, konfigurasi klien dipindahkan dari kelas konfigurasi tunggal (KinesisClientLibConfiguration) ke enam kelas konfigurasi. Tabel berikut menjelaskan migrasi.

Bidang Konfigurasi dan Kelas Baru
Bidang Asli Kelas Konfigurasi Baru Deskripsi
applicationName ConfigsBuilder Nama untuk KCL aplikasi ini. Digunakan sebagai default untuk tableName danconsumerName.
tableName ConfigsBuilder Memungkinkan penggantian nama tabel yang digunakan untuk tabel sewa Amazon DynamoDB.
streamName ConfigsBuilder Nama aliran tempat aplikasi ini memproses catatan dari.
kinesisEndpoint ConfigsBuilder Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien.
dynamoDBEndpoint ConfigsBuilder Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien.
initialPositionInStreamExtended RetrievalConfig Lokasi di pecahan dari mana KCL mulai mengambil catatan, dimulai dengan proses awal aplikasi.
kinesisCredentialsProvider ConfigsBuilder Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien.
dynamoDBCredentialsProvider ConfigsBuilder Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien.
cloudWatchCredentialsProvider ConfigsBuilder Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien.
failoverTimeMillis LeaseManagementConfig Jumlah milidetik yang harus dilewati sebelum Anda dapat mempertimbangkan pemilik sewa telah gagal.
workerIdentifier ConfigsBuilder Pengenal unik yang mewakili instantiasi prosesor aplikasi ini. Ini pasti unik.
shardSyncIntervalMillis LeaseManagementConfig Waktu antara panggilan sinkronisasi shard.
maxRecords PollingConfig Memungkinkan pengaturan jumlah maksimum catatan yang Kinesis kembalikan.
idleTimeBetweenReadsInMillis CoordinatorConfig Opsi ini telah dihapus. Lihat Penghapusan Waktu Idle.
callProcessRecordsEvenForEmptyRecordList ProcessorConfig Ketika ditetapkan, prosesor rekaman dipanggil bahkan ketika tidak ada catatan yang diberikan dari Kinesis.
parentShardPollIntervalMillis CoordinatorConfig Seberapa sering prosesor rekaman harus melakukan polling untuk melihat apakah pecahan induk telah selesai.
cleanupLeasesUponShardCompletion LeaseManagementConfig Ketika ditetapkan, sewa dihapus segera setelah sewa anak mulai diproses.
ignoreUnexpectedChildShards LeaseManagementConfig Saat diatur, pecahan anak yang memiliki pecahan terbuka diabaikan. Ini terutama untuk DynamoDB Streams.
kinesisClientConfig ConfigsBuilder Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien.
dynamoDBClientConfig ConfigsBuilder Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien.
cloudWatchClientConfig ConfigsBuilder Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien.
taskBackoffTimeMillis LifecycleConfig Waktu untuk menunggu untuk mencoba kembali tugas yang gagal.
metricsBufferTimeMillis MetricsConfig Mengontrol penerbitan CloudWatch metrik.
metricsMaxQueueSize MetricsConfig Mengontrol penerbitan CloudWatch metrik.
metricsLevel MetricsConfig Mengontrol penerbitan CloudWatch metrik.
metricsEnabledDimensions MetricsConfig Mengontrol penerbitan CloudWatch metrik.
validateSequenceNumberBeforeCheckpointing CheckpointConfig Opsi ini telah dihapus. Lihat Validasi Nomor Urutan Pos Pemeriksaan.
regionName ConfigsBuilder Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien.
maxLeasesForWorker LeaseManagementConfig Jumlah maksimum sewa satu contoh aplikasi harus diterima.
maxLeasesToStealAtOneTime LeaseManagementConfig Jumlah maksimum sewa aplikasi harus mencoba untuk mencuri pada satu waktu.
initialLeaseTableReadCapacity LeaseManagementConfig DynamoDB IOPs membaca yang digunakan jika Kinesis Client Library perlu membuat tabel sewa DynamoDB baru.
initialLeaseTableWriteCapacity LeaseManagementConfig DynamoDB IOPs membaca yang digunakan jika Kinesis Client Library perlu membuat tabel sewa DynamoDB baru.
initialPositionInStreamExtended LeaseManagementConfig Posisi awal dalam aliran tempat aplikasi harus dimulai. Ini hanya digunakan selama pembuatan sewa awal.
skipShardSyncAtWorkerInitializationIfLeasesExist CoordinatorConfig Nonaktifkan sinkronisasi data pecahan jika tabel sewa berisi sewa yang ada. TODO: KinesisEco -438
shardPrioritization CoordinatorConfig Prioritas pecahan mana yang akan digunakan.
shutdownGraceMillis N/A Opsi ini telah dihapus. Lihat MultiLang Penghapusan.
timeoutInSeconds N/A Opsi ini telah dihapus. Lihat MultiLang Penghapusan.
retryGetRecordsInSeconds PollingConfig Mengkonfigurasi penundaan antara GetRecords upaya untuk kegagalan.
maxGetRecordsThreadPool PollingConfig Ukuran kolam benang yang digunakan untuk GetRecords.
maxLeaseRenewalThreads LeaseManagementConfig Mengontrol ukuran kumpulan utas penyewa penyewaan. Semakin banyak sewa yang dapat diambil aplikasi Anda, semakin besar kumpulan ini seharusnya.
recordsFetcherFactory PollingConfig Memungkinkan penggantian pabrik yang digunakan untuk membuat fetcher yang mengambil dari aliran.
logWarningForTaskAfterMillis LifecycleConfig Berapa lama menunggu sebelum peringatan dicatat jika tugas belum selesai.
listShardsBackoffTimeInMillis RetrievalConfig Jumlah milidetik untuk menunggu di antara panggilan ke ListShards saat kegagalan terjadi.
maxListShardsRetryAttempts RetrievalConfig Jumlah maksimum kali yang ListShards mencoba lagi sebelum menyerah.

Penghapusan waktu idle

Dalam versi 1.xKCL, yang idleTimeBetweenReadsInMillis berhubungan dengan dua kuantitas:

  • Jumlah waktu antara pemeriksaan pengiriman tugas. Anda sekarang dapat mengonfigurasi waktu ini di antara tugas dengan mengaturCoordinatorConfig#shardConsumerDispatchPollIntervalMillis.

  • Jumlah waktu untuk tidur ketika tidak ada catatan yang dikembalikan dari Kinesis Data Streams. Dalam versi 2.0, catatan fan-out yang disempurnakan didorong dari retriever masing-masing. Aktivitas pada konsumen shard hanya terjadi ketika permintaan yang didorong tiba.

Penghapusan konfigurasi klien

Dalam versi 2.0, KCL tidak lagi menciptakan klien. Itu tergantung pada pengguna untuk menyediakan klien yang valid. Dengan perubahan ini, semua parameter konfigurasi yang mengontrol pembuatan klien telah dihapus. Jika Anda membutuhkan parameter ini, Anda dapat mengaturnya pada klien sebelum memberikan klienConfigsBuilder.

Bidang yang Dihapus Konfigurasi Setara
kinesisEndpoint Konfigurasikan titik akhir SDK KinesisAsyncClient dengan pilihan:KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build().
dynamoDBEndpoint Konfigurasikan titik akhir SDK DynamoDbAsyncClient dengan pilihan:DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build().
kinesisClientConfig Konfigurasikan SDK KinesisAsyncClient dengan konfigurasi yang diperlukan:KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build().
dynamoDBClientConfig Konfigurasikan SDK DynamoDbAsyncClient dengan konfigurasi yang diperlukan:DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build().
cloudWatchClientConfig Konfigurasikan SDK CloudWatchAsyncClient dengan konfigurasi yang diperlukan:CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build().
regionName Konfigurasikan SDK dengan Wilayah yang disukai. Ini sama untuk semua SDK klien. Misalnya, KinesisAsyncClient.builder().region(Region.US_WEST_2).build().