Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Migrasikan konsumen dari KCL 1.x ke 2.x KCL
catatan
Kinesis Client Library (KCL) versi 1.x dan 2.x sudah usang. Kami merekomendasikan migrasi ke KCLversi 3.x, yang menawarkan peningkatan kinerja dan fitur baru. Untuk KCL dokumentasi terbaru dan panduan migrasi, lihatGunakan Perpustakaan Klien Kinesis.
Topik ini menjelaskan perbedaan antara versi 1.x dan 2.x dari Kinesis Client Library (). KCL Ini juga menunjukkan kepada Anda cara memigrasikan konsumen Anda dari versi 1.x ke versi 2.x. KCL Setelah Anda memigrasikan klien Anda, klien akan mulai memproses catatan dari lokasi terakhir yang diperiksa.
Versi 2.0 dari KCL memperkenalkan perubahan antarmuka berikut:
KCLAntarmuka 1.x | KCL2.0 Antarmuka |
---|---|
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor |
software.amazon.kinesis.processor.ShardRecordProcessor |
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory |
software.amazon.kinesis.processor.ShardRecordProcessorFactory |
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware |
Dilipat menjadi software.amazon.kinesis.processor.ShardRecordProcessor |
Topik
Migrasikan prosesor rekaman
Contoh berikut menunjukkan prosesor rekaman diimplementasikan untuk KCL 1.x:
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { // // Process records, and possibly checkpoint // } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Untuk memigrasikan kelas prosesor rekaman
-
Ubah antarmuka dari
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
dancom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware
kesoftware.amazon.kinesis.processor.ShardRecordProcessor
, sebagai berikut:// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.ShardRecordProcessor; // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { public class TestRecordProcessor implements ShardRecordProcessor {
-
Perbarui
import
pernyataan untukinitialize
danprocessRecords
metode.// import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
-
Ganti
shutdown
metode dengan metode baru berikut:leaseLost
,shardEnded
, danshutdownRequested
.// @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }
Berikut ini adalah versi terbaru dari kelas prosesor rekaman.
package com.amazonaws.kcl; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; public class TestRecordProcessor implements ShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
Migrasikan pabrik prosesor rekaman
Pabrik prosesor rekaman bertanggung jawab untuk membuat prosesor rekaman ketika sewa diperoleh. Berikut ini adalah contoh pabrik KCL 1.x.
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class TestRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new TestRecordProcessor(); } }
Untuk memigrasi pabrik prosesor rekaman
-
Ubah antarmuka yang diimplementasikan dari
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
kesoftware.amazon.kinesis.processor.ShardRecordProcessorFactory
, sebagai berikut.// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
-
Ubah tanda tangan kembali untuk
createProcessor
.// public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {
Berikut ini adalah contoh pabrik prosesor rekaman di 2.0:
package com.amazonaws.kcl; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new TestRecordProcessor(); } }
Migrasi pekerja
Dalam versi 2.0 dariKCL, kelas baru, disebutScheduler
, menggantikan Worker
kelas. Berikut ini adalah contoh pekerja KCL 1.x.
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();
Untuk memigrasikan pekerja
-
Ubah
import
pernyataan untukWorker
kelas ke pernyataan impor untukScheduler
danConfigsBuilder
kelas.// import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
-
Buat
ConfigsBuilder
dan aScheduler
seperti yang ditunjukkan pada contoh berikut.Disarankan agar Anda menggunakan
KinesisClientUtil
untuk membuatKinesisAsyncClient
dan mengkonfigurasimaxConcurrency
KinesisAsyncClient
.penting
Klien Amazon Kinesis mungkin melihat peningkatan latensi yang signifikan, kecuali jika Anda mengonfigurasi
KinesisAsyncClient
untuk memiliki cukupmaxConcurrency
tinggi untuk memungkinkan semua sewa ditambah penggunaan tambahan.KinesisAsyncClient
import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... Region region = Region.AP_NORTHEAST_2; KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region)); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );
Konfigurasikan klien Amazon Kinesis
Dengan rilis 2.0 dari Kinesis Client Library, konfigurasi klien dipindahkan dari kelas konfigurasi tunggal (KinesisClientLibConfiguration
) ke enam kelas konfigurasi. Tabel berikut menjelaskan migrasi.
Bidang Asli | Kelas Konfigurasi Baru | Deskripsi |
---|---|---|
applicationName |
ConfigsBuilder |
Nama untuk KCL aplikasi ini. Digunakan sebagai default untuk tableName danconsumerName . |
tableName |
ConfigsBuilder |
Memungkinkan penggantian nama tabel yang digunakan untuk tabel sewa Amazon DynamoDB. |
streamName |
ConfigsBuilder |
Nama aliran tempat aplikasi ini memproses catatan dari. |
kinesisEndpoint |
ConfigsBuilder |
Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. |
dynamoDBEndpoint |
ConfigsBuilder |
Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. |
initialPositionInStreamExtended |
RetrievalConfig |
Lokasi di pecahan dari mana KCL mulai mengambil catatan, dimulai dengan proses awal aplikasi. |
kinesisCredentialsProvider |
ConfigsBuilder |
Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. |
dynamoDBCredentialsProvider |
ConfigsBuilder |
Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. |
cloudWatchCredentialsProvider |
ConfigsBuilder |
Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. |
failoverTimeMillis |
LeaseManagementConfig | Jumlah milidetik yang harus dilewati sebelum Anda dapat mempertimbangkan pemilik sewa telah gagal. |
workerIdentifier |
ConfigsBuilder |
Pengenal unik yang mewakili instantiasi prosesor aplikasi ini. Ini pasti unik. |
shardSyncIntervalMillis |
LeaseManagementConfig | Waktu antara panggilan sinkronisasi shard. |
maxRecords |
PollingConfig |
Memungkinkan pengaturan jumlah maksimum catatan yang Kinesis kembalikan. |
idleTimeBetweenReadsInMillis |
CoordinatorConfig |
Opsi ini telah dihapus. Lihat Penghapusan Waktu Idle. |
callProcessRecordsEvenForEmptyRecordList |
ProcessorConfig |
Ketika ditetapkan, prosesor rekaman dipanggil bahkan ketika tidak ada catatan yang diberikan dari Kinesis. |
parentShardPollIntervalMillis |
CoordinatorConfig |
Seberapa sering prosesor rekaman harus melakukan polling untuk melihat apakah pecahan induk telah selesai. |
cleanupLeasesUponShardCompletion |
LeaseManagementConfig |
Ketika ditetapkan, sewa dihapus segera setelah sewa anak mulai diproses. |
ignoreUnexpectedChildShards |
LeaseManagementConfig |
Saat diatur, pecahan anak yang memiliki pecahan terbuka diabaikan. Ini terutama untuk DynamoDB Streams. |
kinesisClientConfig |
ConfigsBuilder |
Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. |
dynamoDBClientConfig |
ConfigsBuilder |
Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. |
cloudWatchClientConfig |
ConfigsBuilder |
Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. |
taskBackoffTimeMillis |
LifecycleConfig |
Waktu untuk menunggu untuk mencoba kembali tugas yang gagal. |
metricsBufferTimeMillis |
MetricsConfig |
Mengontrol penerbitan CloudWatch metrik. |
metricsMaxQueueSize |
MetricsConfig |
Mengontrol penerbitan CloudWatch metrik. |
metricsLevel |
MetricsConfig |
Mengontrol penerbitan CloudWatch metrik. |
metricsEnabledDimensions |
MetricsConfig |
Mengontrol penerbitan CloudWatch metrik. |
validateSequenceNumberBeforeCheckpointing |
CheckpointConfig |
Opsi ini telah dihapus. Lihat Validasi Nomor Urutan Pos Pemeriksaan. |
regionName |
ConfigsBuilder |
Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. |
maxLeasesForWorker |
LeaseManagementConfig |
Jumlah maksimum sewa satu contoh aplikasi harus diterima. |
maxLeasesToStealAtOneTime |
LeaseManagementConfig |
Jumlah maksimum sewa aplikasi harus mencoba untuk mencuri pada satu waktu. |
initialLeaseTableReadCapacity |
LeaseManagementConfig |
DynamoDB IOPs membaca yang digunakan jika Kinesis Client Library perlu membuat tabel sewa DynamoDB baru. |
initialLeaseTableWriteCapacity |
LeaseManagementConfig |
DynamoDB IOPs membaca yang digunakan jika Kinesis Client Library perlu membuat tabel sewa DynamoDB baru. |
initialPositionInStreamExtended |
LeaseManagementConfig | Posisi awal dalam aliran tempat aplikasi harus dimulai. Ini hanya digunakan selama pembuatan sewa awal. |
skipShardSyncAtWorkerInitializationIfLeasesExist |
CoordinatorConfig |
Nonaktifkan sinkronisasi data pecahan jika tabel sewa berisi sewa yang ada. TODO: KinesisEco -438 |
shardPrioritization |
CoordinatorConfig |
Prioritas pecahan mana yang akan digunakan. |
shutdownGraceMillis |
N/A | Opsi ini telah dihapus. Lihat MultiLang Penghapusan. |
timeoutInSeconds |
N/A | Opsi ini telah dihapus. Lihat MultiLang Penghapusan. |
retryGetRecordsInSeconds |
PollingConfig |
Mengkonfigurasi penundaan antara GetRecords upaya untuk kegagalan. |
maxGetRecordsThreadPool |
PollingConfig |
Ukuran kolam benang yang digunakan untuk GetRecords. |
maxLeaseRenewalThreads |
LeaseManagementConfig |
Mengontrol ukuran kumpulan utas penyewa penyewaan. Semakin banyak sewa yang dapat diambil aplikasi Anda, semakin besar kumpulan ini seharusnya. |
recordsFetcherFactory |
PollingConfig |
Memungkinkan penggantian pabrik yang digunakan untuk membuat fetcher yang mengambil dari aliran. |
logWarningForTaskAfterMillis |
LifecycleConfig |
Berapa lama menunggu sebelum peringatan dicatat jika tugas belum selesai. |
listShardsBackoffTimeInMillis |
RetrievalConfig |
Jumlah milidetik untuk menunggu di antara panggilan ke ListShards saat kegagalan terjadi. |
maxListShardsRetryAttempts |
RetrievalConfig |
Jumlah maksimum kali yang ListShards mencoba lagi sebelum menyerah. |
Penghapusan waktu idle
Dalam versi 1.xKCL, yang idleTimeBetweenReadsInMillis
berhubungan dengan dua kuantitas:
-
Jumlah waktu antara pemeriksaan pengiriman tugas. Anda sekarang dapat mengonfigurasi waktu ini di antara tugas dengan mengatur
CoordinatorConfig#shardConsumerDispatchPollIntervalMillis
. -
Jumlah waktu untuk tidur ketika tidak ada catatan yang dikembalikan dari Kinesis Data Streams. Dalam versi 2.0, catatan fan-out yang disempurnakan didorong dari retriever masing-masing. Aktivitas pada konsumen shard hanya terjadi ketika permintaan yang didorong tiba.
Penghapusan konfigurasi klien
Dalam versi 2.0, KCL tidak lagi menciptakan klien. Itu tergantung pada pengguna untuk menyediakan klien yang valid. Dengan perubahan ini, semua parameter konfigurasi yang mengontrol pembuatan klien telah dihapus. Jika Anda membutuhkan parameter ini, Anda dapat mengaturnya pada klien sebelum memberikan klienConfigsBuilder
.
Bidang yang Dihapus | Konfigurasi Setara |
---|---|
kinesisEndpoint |
Konfigurasikan titik akhir SDK KinesisAsyncClient dengan pilihan:KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis
endpoint>")).build() . |
dynamoDBEndpoint |
Konfigurasikan titik akhir SDK DynamoDbAsyncClient dengan pilihan:DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb
endpoint>")).build() . |
kinesisClientConfig |
Konfigurasikan SDK KinesisAsyncClient dengan konfigurasi yang diperlukan:KinesisAsyncClient.builder().overrideConfiguration(<your
configuration>).build() . |
dynamoDBClientConfig |
Konfigurasikan SDK DynamoDbAsyncClient dengan konfigurasi yang diperlukan:DynamoDbAsyncClient.builder().overrideConfiguration(<your
configuration>).build() . |
cloudWatchClientConfig |
Konfigurasikan SDK CloudWatchAsyncClient dengan konfigurasi yang diperlukan:CloudWatchAsyncClient.builder().overrideConfiguration(<your
configuration>).build() . |
regionName |
Konfigurasikan SDK dengan Wilayah yang disukai. Ini sama untuk semua SDK klien. Misalnya, KinesisAsyncClient.builder().region(Region.US_WEST_2).build() . |