

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Migrasikan konsumen dari KCL 1.x ke KCL 2.x
<a name="kcl-migration"></a>

**penting**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami **sangat menyarankan** Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman [Perpustakaan Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat[Gunakan Perpustakaan Klien Kinesis](kcl.md). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat. [Migrasi dari KCL 1.x ke KCL 3.x](kcl-migration-1-3.md)

Topik ini menjelaskan perbedaan antara versi 1.x dan 2.x dari Kinesis Client Library (KCL). Ini juga menunjukkan kepada Anda cara memigrasikan konsumen Anda dari versi 1.x ke versi 2.x dari KCL. Setelah Anda memigrasikan klien Anda, klien akan mulai memproses catatan dari lokasi terakhir yang diperiksa.

Versi 2.0 dari KCL memperkenalkan perubahan antarmuka berikut:


**Perubahan Antarmuka KCL**  

| Antarmuka KCL 1.x | Antarmuka KCL 2.0 | 
| --- | --- | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor | software.amazon.kinesis.processor.ShardRecordProcessor | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory | software.amazon.kinesis.processor.ShardRecordProcessorFactory | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware | Dilipat menjadi software.amazon.kinesis.processor.ShardRecordProcessor | 

**Topics**
+ [Migrasikan prosesor rekaman](#recrod-processor-migration)
+ [Migrasikan pabrik prosesor rekaman](#recrod-processor-factory-migration)
+ [Migrasi pekerja](#worker-migration)
+ [Konfigurasikan klien Amazon Kinesis](#client-configuration)
+ [Penghapusan waktu idle](#idle-time-removal)
+ [Penghapusan konfigurasi klien](#client-configuration-removals)

## Migrasikan prosesor rekaman
<a name="recrod-processor-migration"></a>

Contoh berikut menunjukkan prosesor rekaman diimplementasikan untuk KCL 1.x:

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        //
        // Process records, and possibly checkpoint
        //
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**Untuk memigrasikan kelas prosesor rekaman**

1. Ubah antarmuka dari `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` dan `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` ke`software.amazon.kinesis.processor.ShardRecordProcessor`, sebagai berikut:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
   public class TestRecordProcessor implements ShardRecordProcessor {
   ```

1. Perbarui `import` pernyataan untuk `initialize` dan `processRecords` metode.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
   ```

1. Ganti `shutdown` metode dengan metode baru berikut:`leaseLost`,`shardEnded`, dan`shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

Berikut ini adalah versi terbaru dari kelas prosesor rekaman.

```
package com.amazonaws.kcl;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class TestRecordProcessor implements ShardRecordProcessor {
    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

## Migrasikan pabrik prosesor rekaman
<a name="recrod-processor-factory-migration"></a>

Pabrik prosesor rekaman bertanggung jawab untuk membuat prosesor rekaman ketika sewa diperoleh. Berikut ini adalah contoh pabrik KCL 1.x.

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class TestRecordProcessorFactory implements IRecordProcessorFactory {
    @Override
    public IRecordProcessor createProcessor() {
        return new TestRecordProcessor();
    }
}
```

**Untuk memigrasi pabrik prosesor rekaman**

1. Ubah antarmuka yang diimplementasikan dari `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` ke`software.amazon.kinesis.processor.ShardRecordProcessorFactory`, sebagai berikut.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
   public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
   ```

1. Ubah tanda tangan kembali untuk`createProcessor`.

   ```
   // public IRecordProcessor createProcessor() {
   public ShardRecordProcessor shardRecordProcessor() {
   ```

Berikut ini adalah contoh pabrik prosesor rekaman di 2.0:

```
package com.amazonaws.kcl;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new TestRecordProcessor();
    }
}
```

## Migrasi pekerja
<a name="worker-migration"></a>

Dalam versi 2.0 dari KCL, kelas baru, yang disebut`Scheduler`, menggantikan kelas. `Worker` Berikut ini adalah contoh pekerja KCL 1.x.

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

**Untuk memigrasikan pekerja**

1. Ubah `import` pernyataan untuk `Worker` kelas ke pernyataan impor untuk `Scheduler` dan `ConfigsBuilder` kelas.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Buat `ConfigsBuilder` dan a `Scheduler` seperti yang ditunjukkan pada contoh berikut.

   Disarankan agar Anda menggunakan `KinesisClientUtil` untuk membuat `KinesisAsyncClient` dan mengkonfigurasi `maxConcurrency``KinesisAsyncClient`.
**penting**  
Klien Amazon Kinesis mungkin melihat peningkatan latensi yang signifikan, kecuali jika Anda mengonfigurasi `KinesisAsyncClient` untuk memiliki cukup `maxConcurrency` tinggi untuk memungkinkan semua sewa ditambah penggunaan tambahan. `KinesisAsyncClient`

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
   Region region = Region.AP_NORTHEAST_2;
   KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
   ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());
   
   Scheduler scheduler = new Scheduler(
       configsBuilder.checkpointConfig(),
       configsBuilder.coordinatorConfig(),
       configsBuilder.leaseManagementConfig(),
       configsBuilder.lifecycleConfig(),
       configsBuilder.metricsConfig(),
       configsBuilder.processorConfig(),
       configsBuilder.retrievalConfig()
       );
   ```

## Konfigurasikan klien Amazon Kinesis
<a name="client-configuration"></a>

Dengan rilis 2.0 dari Kinesis Client Library, konfigurasi klien dipindahkan dari kelas konfigurasi tunggal (`KinesisClientLibConfiguration`) ke enam kelas konfigurasi. Tabel berikut menjelaskan migrasi.


**Bidang Konfigurasi dan Kelas Baru Mereka**  

| Bidang Asli | Kelas Konfigurasi Baru | Deskripsi | 
| --- | --- | --- | 
| applicationName | ConfigsBuilder | Nama untuk ini aplikasi KCL. Digunakan sebagai default untuk tableName danconsumerName. | 
| tableName | ConfigsBuilder | Memungkinkan penggantian nama tabel yang digunakan untuk tabel sewa Amazon DynamoDB. | 
| streamName | ConfigsBuilder | Nama aliran tempat aplikasi ini memproses catatan dari. | 
| kinesisEndpoint | ConfigsBuilder | Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. | 
| dynamoDBEndpoint | ConfigsBuilder | Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. | 
| initialPositionInStreamExtended | RetrievalConfig | Lokasi di pecahan tempat KCL mulai mengambil catatan, dimulai dengan proses awal aplikasi. | 
| kinesisCredentialsProvider | ConfigsBuilder | Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. | 
| dynamoDBCredentialsProvider | ConfigsBuilder | Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. | 
| cloudWatchCredentialsProvider | ConfigsBuilder | Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. | 
| failoverTimeMillis | LeaseManagementConfig | Jumlah milidetik yang harus dilewati sebelum Anda dapat mempertimbangkan pemilik sewa telah gagal. | 
| workerIdentifier | ConfigsBuilder | Pengenal unik yang mewakili instantiasi prosesor aplikasi ini. Ini pasti unik. | 
| shardSyncIntervalMillis | LeaseManagementConfig | Waktu antara panggilan sinkronisasi shard. | 
| maxRecords | PollingConfig | Memungkinkan pengaturan jumlah maksimum catatan yang Kinesis kembalikan. | 
| idleTimeBetweenReadsInMillis | CoordinatorConfig | Opsi ini telah dihapus. Lihat Penghapusan Waktu Idle. | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig | Ketika ditetapkan, prosesor rekaman dipanggil bahkan ketika tidak ada catatan yang diberikan dari Kinesis. | 
| parentShardPollIntervalMillis | CoordinatorConfig | Seberapa sering prosesor rekaman harus melakukan polling untuk melihat apakah pecahan induk telah selesai. | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig | Ketika ditetapkan, sewa dihapus segera setelah sewa anak mulai diproses. | 
| ignoreUnexpectedChildShards | LeaseManagementConfig | Saat diatur, pecahan anak yang memiliki pecahan terbuka diabaikan. Ini terutama untuk DynamoDB Streams. | 
| kinesisClientConfig | ConfigsBuilder | Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. | 
| dynamoDBClientConfig | ConfigsBuilder | Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. | 
| cloudWatchClientConfig | ConfigsBuilder | Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. | 
| taskBackoffTimeMillis | LifecycleConfig | Waktu untuk menunggu untuk mencoba kembali tugas yang gagal. | 
| metricsBufferTimeMillis | MetricsConfig | Mengontrol penerbitan CloudWatch metrik. | 
| metricsMaxQueueSize | MetricsConfig | Mengontrol penerbitan CloudWatch metrik. | 
| metricsLevel | MetricsConfig | Mengontrol penerbitan CloudWatch metrik. | 
| metricsEnabledDimensions | MetricsConfig | Mengontrol penerbitan CloudWatch metrik. | 
| validateSequenceNumberBeforeCheckpointing | CheckpointConfig | Opsi ini telah dihapus. Lihat Validasi Nomor Urutan Pos Pemeriksaan. | 
| regionName | ConfigsBuilder | Opsi ini telah dihapus. Lihat Penghapusan Konfigurasi Klien. | 
| maxLeasesForWorker | LeaseManagementConfig | Jumlah maksimum sewa satu contoh aplikasi harus diterima. | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig | Jumlah maksimum sewa aplikasi harus mencoba untuk mencuri pada satu waktu. | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | DynamoDB IOPs membaca yang digunakan jika Kinesis Client Library perlu membuat tabel sewa DynamoDB baru. | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | DynamoDB IOPs membaca yang digunakan jika Kinesis Client Library perlu membuat tabel sewa DynamoDB baru. | 
| initialPositionInStreamExtended | LeaseManagementConfig | Posisi awal dalam aliran tempat aplikasi harus dimulai. Ini hanya digunakan selama pembuatan sewa awal. | 
| skipShardSyncAtWorkerInitializationIfLeasesExist | CoordinatorConfig | Nonaktifkan sinkronisasi data pecahan jika tabel sewa berisi sewa yang ada. TODO: KinesisEco -438 | 
| shardPrioritization | CoordinatorConfig | Prioritas pecahan mana yang akan digunakan. | 
| shutdownGraceMillis | N/A | Opsi ini telah dihapus. Lihat MultiLang Penghapusan. | 
| timeoutInSeconds | N/A | Opsi ini telah dihapus. Lihat MultiLang Penghapusan. | 
| retryGetRecordsInSeconds | PollingConfig | Mengkonfigurasi penundaan antara GetRecords upaya untuk kegagalan. | 
| maxGetRecordsThreadPool | PollingConfig | Ukuran kolam benang yang digunakan untuk GetRecords. | 
| maxLeaseRenewalThreads | LeaseManagementConfig | Mengontrol ukuran kumpulan utas penyewa penyewaan. Semakin banyak sewa yang dapat diambil aplikasi Anda, semakin besar kumpulan ini seharusnya. | 
| recordsFetcherFactory | PollingConfig | Memungkinkan penggantian pabrik yang digunakan untuk membuat fetcher yang mengambil dari aliran. | 
| logWarningForTaskAfterMillis | LifecycleConfig | Berapa lama menunggu sebelum peringatan dicatat jika tugas belum selesai. | 
| listShardsBackoffTimeInMillis | RetrievalConfig | Jumlah milidetik untuk menunggu di antara panggilan ke ListShards saat kegagalan terjadi. | 
| maxListShardsRetryAttempts | RetrievalConfig | Jumlah maksimum kali yang ListShards mencoba lagi sebelum menyerah. | 

## Penghapusan waktu idle
<a name="idle-time-removal"></a>

Dalam versi 1.x dari KCL, yang `idleTimeBetweenReadsInMillis` berhubungan dengan dua kuantitas: 
+ Jumlah waktu antara pemeriksaan pengiriman tugas. Anda sekarang dapat mengonfigurasi waktu ini di antara tugas dengan mengatur`CoordinatorConfig#shardConsumerDispatchPollIntervalMillis`.
+ Jumlah waktu untuk tidur ketika tidak ada catatan yang dikembalikan dari Kinesis Data Streams. Dalam versi 2.0, catatan fan-out yang disempurnakan didorong dari retriever masing-masing. Aktivitas pada konsumen shard hanya terjadi ketika permintaan yang didorong tiba. 

## Penghapusan konfigurasi klien
<a name="client-configuration-removals"></a>

Di versi 2.0, KCL tidak lagi menciptakan klien. Itu tergantung pada pengguna untuk menyediakan klien yang valid. Dengan perubahan ini, semua parameter konfigurasi yang mengontrol pembuatan klien telah dihapus. Jika Anda membutuhkan parameter ini, Anda dapat mengaturnya pada klien sebelum memberikan klien`ConfigsBuilder`.


****  

| Bidang yang Dihapus | Konfigurasi Setara | 
| --- | --- | 
| kinesisEndpoint | Konfigurasikan SDK KinesisAsyncClient dengan titik akhir pilihan:. KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build() | 
| dynamoDBEndpoint | Konfigurasikan SDK DynamoDbAsyncClient dengan titik akhir pilihan:. DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build() | 
| kinesisClientConfig | Konfigurasikan SDK KinesisAsyncClient dengan konfigurasi yang diperlukan:KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| dynamoDBClientConfig | Konfigurasikan SDK DynamoDbAsyncClient dengan konfigurasi yang diperlukan:DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| cloudWatchClientConfig | Konfigurasikan SDK CloudWatchAsyncClient dengan konfigurasi yang diperlukan:CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| regionName | Konfigurasikan SDK dengan Wilayah pilihan. Ini sama untuk semua klien SDK. Misalnya, KinesisAsyncClient.builder().region(Region.US\$1WEST\$12).build(). | 