Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
penting
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami sangat menyarankan Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman Perpustakaan Klien Amazon Kinesis
Topik ini menjelaskan perbedaan antara versi 1.x dan 2.x dari Kinesis Client Library (KCL). Ini juga menunjukkan kepada Anda cara memigrasikan konsumen Anda dari versi 1.x ke versi 2.x dari KCL. Setelah Anda memigrasikan klien Anda, klien akan mulai memproses catatan dari lokasi terakhir yang diperiksa.
Versi 2.0 dari KCL memperkenalkan perubahan antarmuka berikut:
Antarmuka KCL 1.x | Antarmuka KCL 2.0 |
---|---|
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor |
software.amazon.kinesis.processor.ShardRecordProcessor |
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory |
software.amazon.kinesis.processor.ShardRecordProcessorFactory |
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware |
Dilipat menjadi software.amazon.kinesis.processor.ShardRecordProcessor |
Topik
Migrasikan prosesor rekaman
Contoh berikut menunjukkan prosesor rekaman diimplementasikan untuk KCL 1.x:
package com.amazonaws.kcl;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
@Override
public void initialize(InitializationInput initializationInput) {
//
// Setup record processor
//
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
//
// Process records, and possibly checkpoint
//
}
@Override
public void shutdown(ShutdownInput shutdownInput) {
if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
try {
shutdownInput.getCheckpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
throw new RuntimeException(e);
}
}
}
@Override
public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
try {
checkpointer.checkpoint();
} catch (ShutdownException | InvalidStateException e) {
//
// Swallow exception
//
e.printStackTrace();
}
}
}
Untuk memigrasikan kelas prosesor rekaman
-
Ubah antarmuka dari
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
dancom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware
kesoftware.amazon.kinesis.processor.ShardRecordProcessor
, sebagai berikut:// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.ShardRecordProcessor; // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { public class TestRecordProcessor implements ShardRecordProcessor {
-
Perbarui
import
pernyataan untukinitialize
danprocessRecords
metode.// import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
-
Ganti
shutdown
metode dengan metode baru berikut:leaseLost
,shardEnded
, danshutdownRequested
.// @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }
Berikut ini adalah versi terbaru dari kelas prosesor rekaman.
package com.amazonaws.kcl;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
public class TestRecordProcessor implements ShardRecordProcessor {
@Override
public void initialize(InitializationInput initializationInput) {
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
}
@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
}
@Override
public void shardEnded(ShardEndedInput shardEndedInput) {
try {
shardEndedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
//
// Swallow the exception
//
e.printStackTrace();
}
}
@Override
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
try {
shutdownRequestedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
//
// Swallow the exception
//
e.printStackTrace();
}
}
}
Migrasikan pabrik prosesor rekaman
Pabrik prosesor rekaman bertanggung jawab untuk membuat prosesor rekaman ketika sewa diperoleh. Berikut ini adalah contoh pabrik KCL 1.x.
package com.amazonaws.kcl;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
public class TestRecordProcessorFactory implements IRecordProcessorFactory {
@Override
public IRecordProcessor createProcessor() {
return new TestRecordProcessor();
}
}
Untuk memigrasi pabrik prosesor rekaman
-
Ubah antarmuka yang diimplementasikan dari
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
kesoftware.amazon.kinesis.processor.ShardRecordProcessorFactory
, sebagai berikut.// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
-
Ubah tanda tangan kembali untuk
createProcessor
.// public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {
Berikut ini adalah contoh pabrik prosesor rekaman di 2.0:
package com.amazonaws.kcl;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
@Override
public ShardRecordProcessor shardRecordProcessor() {
return new TestRecordProcessor();
}
}
Migrasi pekerja
Dalam versi 2.0 dari KCL, kelas baru, yang disebutScheduler
, menggantikan kelas. Worker
Berikut ini adalah contoh pekerja KCL 1.x.
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(config)
.build();
Untuk memigrasikan pekerja
-
Ubah
import
pernyataan untukWorker
kelas ke pernyataan impor untukScheduler
danConfigsBuilder
kelas.// import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
-
Buat
ConfigsBuilder
dan aScheduler
seperti yang ditunjukkan pada contoh berikut.Disarankan agar Anda menggunakan
KinesisClientUtil
untuk membuatKinesisAsyncClient
dan mengkonfigurasimaxConcurrency
KinesisAsyncClient
.penting
Klien Amazon Kinesis mungkin melihat peningkatan latensi yang signifikan, kecuali jika Anda mengonfigurasi
KinesisAsyncClient
untuk memiliki cukupmaxConcurrency
tinggi untuk memungkinkan semua sewa ditambah penggunaan tambahan.KinesisAsyncClient
import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... Region region = Region.AP_NORTHEAST_2; KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region)); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );
Konfigurasikan klien Amazon Kinesis
Dengan rilis 2.0 dari Kinesis Client Library, konfigurasi klien dipindahkan dari kelas konfigurasi tunggal (KinesisClientLibConfiguration
) ke enam kelas konfigurasi. Tabel berikut menjelaskan migrasi.
Bidang Asli | Kelas Konfigurasi Baru | Deskripsi |
---|---|---|
applicationName |
ConfigsBuilder |
Nama untuk ini aplikasi KCL. Digunakan sebagai default untuk tableName danconsumerName . |
tableName |
ConfigsBuilder |
Memungkinkan penggantian nama tabel yang digunakan untuk tabel sewa Amazon DynamoDB. |
streamName |
ConfigsBuilder |
Nama aliran tempat aplikasi ini memproses catatan dari. |
kinesisEndpoint |
ConfigsBuilder |
Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. |
dynamoDBEndpoint |
ConfigsBuilder |
Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. |
initialPositionInStreamExtended |
RetrievalConfig |
Lokasi di pecahan tempat KCL mulai mengambil catatan, dimulai dengan proses awal aplikasi. |
kinesisCredentialsProvider |
ConfigsBuilder |
Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. |
dynamoDBCredentialsProvider |
ConfigsBuilder |
Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. |
cloudWatchCredentialsProvider |
ConfigsBuilder |
Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. |
failoverTimeMillis |
LeaseManagementConfig | Jumlah milidetik yang harus dilewati sebelum Anda dapat mempertimbangkan pemilik sewa telah gagal. |
workerIdentifier |
ConfigsBuilder |
Pengidentifikasi unik yang mewakili instantiasi prosesor aplikasi ini. Ini pasti unik. |
shardSyncIntervalMillis |
LeaseManagementConfig | Waktu antara panggilan sinkronisasi shard. |
maxRecords |
PollingConfig |
Memungkinkan pengaturan jumlah maksimum catatan yang Kinesis kembalikan. |
idleTimeBetweenReadsInMillis |
CoordinatorConfig |
Opsi ini telah dihapus. Lihat Penghapusan Waktu Idle. |
callProcessRecordsEvenForEmptyRecordList |
ProcessorConfig |
Ketika ditetapkan, prosesor rekaman dipanggil bahkan ketika tidak ada catatan yang diberikan dari Kinesis. |
parentShardPollIntervalMillis |
CoordinatorConfig |
Seberapa sering prosesor rekaman harus melakukan polling untuk melihat apakah pecahan induk telah selesai. |
cleanupLeasesUponShardCompletion |
LeaseManagementConfig |
Ketika ditetapkan, sewa dihapus segera setelah sewa anak mulai diproses. |
ignoreUnexpectedChildShards |
LeaseManagementConfig |
Saat diatur, pecahan anak yang memiliki pecahan terbuka diabaikan. Ini terutama untuk DynamoDB Streams. |
kinesisClientConfig |
ConfigsBuilder |
Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. |
dynamoDBClientConfig |
ConfigsBuilder |
Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. |
cloudWatchClientConfig |
ConfigsBuilder |
Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. |
taskBackoffTimeMillis |
LifecycleConfig |
Waktu untuk menunggu untuk mencoba kembali tugas yang gagal. |
metricsBufferTimeMillis |
MetricsConfig |
Mengontrol penerbitan CloudWatch metrik. |
metricsMaxQueueSize |
MetricsConfig |
Mengontrol penerbitan CloudWatch metrik. |
metricsLevel |
MetricsConfig |
Mengontrol penerbitan CloudWatch metrik. |
metricsEnabledDimensions |
MetricsConfig |
Mengontrol penerbitan CloudWatch metrik. |
validateSequenceNumberBeforeCheckpointing |
CheckpointConfig |
Opsi ini telah dihapus. Lihat Validasi Nomor Urutan Pos Pemeriksaan. |
regionName |
ConfigsBuilder |
Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. |
maxLeasesForWorker |
LeaseManagementConfig |
Jumlah maksimum sewa satu contoh aplikasi harus diterima. |
maxLeasesToStealAtOneTime |
LeaseManagementConfig |
Jumlah maksimum sewa aplikasi harus mencoba untuk mencuri pada satu waktu. |
initialLeaseTableReadCapacity |
LeaseManagementConfig |
DynamoDB IOPs membaca yang digunakan jika Kinesis Client Library perlu membuat tabel sewa DynamoDB baru. |
initialLeaseTableWriteCapacity |
LeaseManagementConfig |
DynamoDB IOPs membaca yang digunakan jika Kinesis Client Library perlu membuat tabel sewa DynamoDB baru. |
initialPositionInStreamExtended |
LeaseManagementConfig | Posisi awal dalam aliran tempat aplikasi harus dimulai. Ini hanya digunakan selama pembuatan sewa awal. |
skipShardSyncAtWorkerInitializationIfLeasesExist |
CoordinatorConfig |
Nonaktifkan sinkronisasi data pecahan jika tabel sewa berisi sewa yang ada. TODO: KinesisEco -438 |
shardPrioritization |
CoordinatorConfig |
Prioritas pecahan mana yang akan digunakan. |
shutdownGraceMillis |
N/A | Opsi ini telah dihapus. Lihat MultiLang Penghapusan. |
timeoutInSeconds |
N/A | Opsi ini telah dihapus. Lihat MultiLang Penghapusan. |
retryGetRecordsInSeconds |
PollingConfig |
Mengkonfigurasi penundaan antara GetRecords upaya untuk kegagalan. |
maxGetRecordsThreadPool |
PollingConfig |
Ukuran kolam benang yang digunakan untuk GetRecords. |
maxLeaseRenewalThreads |
LeaseManagementConfig |
Mengontrol ukuran kumpulan utas penyewa penyewaan. Semakin banyak sewa yang dapat diambil aplikasi Anda, semakin besar kumpulan ini seharusnya. |
recordsFetcherFactory |
PollingConfig |
Memungkinkan penggantian pabrik yang digunakan untuk membuat fetcher yang mengambil dari aliran. |
logWarningForTaskAfterMillis |
LifecycleConfig |
Berapa lama menunggu sebelum peringatan dicatat jika tugas belum selesai. |
listShardsBackoffTimeInMillis |
RetrievalConfig |
Jumlah milidetik untuk menunggu di antara panggilan ListShards ketika kegagalan terjadi. |
maxListShardsRetryAttempts |
RetrievalConfig |
Jumlah maksimum kali yang ListShards mencoba lagi sebelum menyerah. |
Penghapusan waktu idle
Dalam versi 1.x dari KCL, yang idleTimeBetweenReadsInMillis
berhubungan dengan dua kuantitas:
-
Jumlah waktu antara pemeriksaan pengiriman tugas. Anda sekarang dapat mengonfigurasi waktu ini di antara tugas dengan mengatur
CoordinatorConfig#shardConsumerDispatchPollIntervalMillis
. -
Jumlah waktu untuk tidur ketika tidak ada catatan yang dikembalikan dari Kinesis Data Streams. Dalam versi 2.0, catatan fan-out yang disempurnakan didorong dari retriever masing-masing. Aktivitas pada konsumen shard hanya terjadi ketika permintaan yang didorong tiba.
Penghapusan konfigurasi klien
Di versi 2.0, KCL tidak lagi menciptakan klien. Itu tergantung pada pengguna untuk menyediakan klien yang valid. Dengan perubahan ini, semua parameter konfigurasi yang mengontrol pembuatan klien telah dihapus. Jika Anda membutuhkan parameter ini, Anda dapat mengaturnya pada klien sebelum memberikan klienConfigsBuilder
.
Bidang yang Dihapus | Konfigurasi Setara |
---|---|
kinesisEndpoint |
Konfigurasikan SDK KinesisAsyncClient dengan titik akhir pilihan:. KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis
endpoint>")).build() |
dynamoDBEndpoint |
Konfigurasikan SDK DynamoDbAsyncClient dengan titik akhir pilihan:. DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb
endpoint>")).build() |
kinesisClientConfig |
Konfigurasikan SDK KinesisAsyncClient dengan konfigurasi yang diperlukan:KinesisAsyncClient.builder().overrideConfiguration(<your
configuration>).build() . |
dynamoDBClientConfig |
Konfigurasikan SDK DynamoDbAsyncClient dengan konfigurasi yang diperlukan:DynamoDbAsyncClient.builder().overrideConfiguration(<your
configuration>).build() . |
cloudWatchClientConfig |
Konfigurasikan SDK CloudWatchAsyncClient dengan konfigurasi yang diperlukan:CloudWatchAsyncClient.builder().overrideConfiguration(<your
configuration>).build() . |
regionName |
Konfigurasikan SDK dengan Region pilihan. Ini sama untuk semua klien SDK. Misalnya, KinesisAsyncClient.builder().region(Region.US_WEST_2).build() . |