

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Menggunakan adaptor DynamoDB Streams Kinesis untuk memproses catatan aliran
<a name="Streams.KCLAdapter"></a>

Menggunakan Adaptor Amazon Kinesis adalah cara yang disarankan untuk menggunakan aliran dari Amazon DynamoDB. DynamoDB Streams API sengaja mirip dengan Kinesis Data Streams. Di kedua layanan, aliran data terdiri dari pecahan, yang merupakan wadah untuk rekaman aliran. Kedua layanan APIs berisi`ListStreams`,, `DescribeStream``GetShards`, dan `GetShardIterator` operasi. (Meskipun tindakan DynamoDB Streams ini serupa dengan tindakan serupa di Kinesis Data Streams, tindakan tersebut tidak 100 persen identik.)

Sebagai pengguna DynamoDB Streams, Anda dapat menggunakan pola desain yang ditemukan dalam KCL untuk memproses serpihan dan rekaman aliran DynamoDB Streams. Untuk melakukan ini, Anda menggunakan Adaptor DynamoDB Streams Kinesis. Adaptor Kinesis mengimplementasikan antarmuka Kinesis Data Streams sehingga KCL dapat digunakan untuk menggunakan dan memproses catatan dari DynamoDB Streams. [Untuk petunjuk tentang cara mengatur dan menginstal Adaptor Kinesis DynamoDB Streams, lihat repositori. GitHub](https://github.com/awslabs/dynamodb-streams-kinesis-adapter)

Anda dapat menulis aplikasi untuk Kinesis Data Streams menggunakan Kinesis Client Library (KCL). KCL menyederhanakan pengodean dengan menyediakan abstraksi yang berguna di atas Kinesis Data Streams API tingkat rendah. Untuk informasi selengkapnya tentang KCL, lihat [Mengembangkan konsumen menggunakan Kinesis client library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) dalam *Panduan Developer Amazon Kinesis Data Streams*.

DynamoDB merekomendasikan penggunaan KCL versi 3.x dengan SDK AWS for Java v2.x. [Adaptor Kinesis DynamoDB Streams versi AWS 1.x saat ini dengan AWS SDK untuk Java SDK untuk v1.x akan terus didukung sepenuhnya sepanjang siklus hidupnya sebagaimana dimaksud selama periode transisi sesuai dengan kebijakan pemeliharaan dan Tools.AWS SDKs ](https://docs.aws.amazon.com/sdkref/latest/guide/maint-policy.html)

**catatan**  
Perpustakaan Klien Amazon Kinesis (KCL) versi 1.x dan 2.x sudah usang. KCL 1.x akan mencapai end-of-support pada 30 Januari 2026. Kami sangat menyarankan Anda memigrasikan aplikasi KCL Anda menggunakan versi 1.x ke versi KCL terbaru sebelum 30 Januari 2026. Untuk menemukan versi KCL terbaru, lihat halaman Perpustakaan [Klien Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) di. GitHub Untuk informasi tentang versi KCL terbaru, lihat [Menggunakan Perpustakaan Klien Kinesis](https://docs.aws.amazon.com/streams/latest/dev/kcl.html). Untuk informasi tentang migrasi dari KCL 1.x ke KCL 3.x, lihat Migrasi dari KCL 1.x ke KCL 3.x.

Diagram berikut menunjukkan bagaimana perpustakaan ini berinteraksi satu sama lain.

![\[Interaksi antara DynamoDB Streams, Kinesis Data Streams, dan KCL untuk memproses rekaman DynamoDB Streams.\]](http://docs.aws.amazon.com/id_id/amazondynamodb/latest/developerguide/images/streams-kinesis-adapter.png)


Dengan Adaptor Kinesis DynamoDB Streams, Anda dapat mulai mengembangkan antarmuka KCL, dengan panggilan API diarahkan secara mulus ke titik akhir DynamoDB Streams.

Saat aplikasi Anda dimulai, aplikasi akan memanggil KCL untuk membuat instance pekerja. Anda harus memberi pekerja informasi konfigurasi untuk aplikasi, seperti deskriptor aliran dan AWS kredensil, dan nama kelas prosesor rekaman yang Anda berikan. Saat menjalankan kode di pemroses rekaman, pekerja melakukan tugas-tugas berikut:
+ Menghubungkan ke aliran
+ Menghitung pecahan dalam aliran
+ Memeriksa dan menghitung pecahan anak dari pecahan induk tertutup di dalam aliran
+ Mengkoordinasikan asosiasi serpihan dengan pekerja lain (jika ada)
+ Membuat instance pemroses rekaman untuk setiap pecahan yang dikelolanya
+ Menarik catatan dari aliran
+ Menskalakan tingkat panggilan GetRecords API selama throughput tinggi (jika mode catch-up dikonfigurasi)
+ Mendorong rekaman ke pemroses rekaman yang sesuai
+ Catatan yang diproses di pos pemeriksaan
+ Menyeimbangkan asosiasi pekerja pecahan ketika jumlah instans pekerja berubah
+ Menyeimbangkan asosiasi pekerja pecahan saat pecahan dipisahkan

Adaptor KCL mendukung mode catch-up, fitur penyesuaian laju panggilan otomatis untuk menangani peningkatan throughput sementara. Ketika kelambatan pemrosesan aliran melebihi ambang batas yang dapat dikonfigurasi (default satu menit), mode catch-up menskalakan frekuensi panggilan GetRecords API dengan nilai yang dapat dikonfigurasi (default 3x) untuk mengambil catatan lebih cepat, lalu kembali normal setelah jeda turun. Ini berharga selama periode throughput tinggi di mana aktivitas penulisan DynamoDB dapat membanjiri konsumen menggunakan tingkat polling default. Mode catch-up dapat diaktifkan melalui parameter `catchupEnabled` konfigurasi (default false).

**catatan**  
Untuk deskripsi konsep KCL yang tercantum di sini, lihat [Mengembangkan konsumen menggunakan Kinesis client library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) di *Panduan Pengembang Amazon Kinesis Data Streams*.  
Untuk informasi lebih lanjut tentang menggunakan stream dengan lihat AWS Lambda [DynamoDB Streams dan pemicu AWS Lambda](Streams.Lambda.md)

# Migrasi dari KCL 1.x ke KCL 3.x
<a name="streams-migrating-kcl"></a>

## Ikhtisar
<a name="migrating-kcl-overview"></a>

Panduan ini memberikan petunjuk untuk memigrasikan aplikasi konsumen Anda dari KCL 1.x ke KCL 3.x. Karena perbedaan arsitektur antara KCL 1.x dan KCL 3.x, migrasi memerlukan pembaruan beberapa komponen untuk memastikan kompatibilitas.

KCL 1.x menggunakan kelas dan antarmuka yang berbeda dibandingkan dengan KCL 3.x. Anda harus memigrasikan prosesor rekaman, pabrik prosesor rekaman, dan kelas pekerja ke format yang kompatibel dengan KCL 3.x terlebih dahulu, dan ikuti langkah-langkah migrasi untuk migrasi KCL 1.x ke KCL 3.x.

## Langkah migrasi
<a name="migration-steps"></a>

**Topics**
+ [Langkah 1: Migrasikan prosesor rekaman](#step1-record-processor)
+ [Langkah 2: Migrasikan pabrik prosesor rekaman](#step2-record-processor-factory)
+ [Langkah 3: Migrasikan pekerja](#step3-worker-migration)
+ [Langkah 4: Ikhtisar dan rekomendasi konfigurasi KCL 3.x](#step4-configuration-migration)
+ [Langkah 5: Migrasi dari KCL 2.x ke KCL 3.x](#step5-kcl2-to-kcl3)

### Langkah 1: Migrasikan prosesor rekaman
<a name="step1-record-processor"></a>

Contoh berikut menunjukkan prosesor rekaman diimplementasikan untuk adaptor KCL 1.x DynamoDB Streams Kinesis:

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**Untuk memigrasikan kelas RecordProcessor**

1. Ubah antarmuka dari `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` dan `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` menjadi `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor` sebagai berikut:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. Perbarui pernyataan impor untuk `initialize` dan `processRecords` metode:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. Ganti `shutdownRequested` metode dengan metode baru berikut:`leaseLost`,`shardEnded`, dan`shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

Berikut ini adalah versi terbaru dari kelas prosesor rekaman:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**catatan**  
DynamoDB Streams Kinesis Adapter sekarang menggunakan model Record. SDKv2 Dalam SDKv2, `AttributeValue` objek kompleks (`BS`,`NS`, `M``L`,`SS`) tidak pernah mengembalikan null. Gunakan`hasBs()`,`hasNs()`,`hasM()`,`hasL()`, `hasSs()` metode untuk memverifikasi apakah nilai-nilai ini ada.

### Langkah 2: Migrasikan pabrik prosesor rekaman
<a name="step2-record-processor-factory"></a>

Pabrik prosesor rekaman bertanggung jawab untuk membuat prosesor rekaman ketika sewa diperoleh. Berikut ini adalah contoh pabrik KCL 1.x:

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**Untuk memigrasikan `RecordProcessorFactory`**
+ Ubah antarmuka yang diimplementasikan dari `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` ke`software.amazon.kinesis.processor.ShardRecordProcessorFactory`, sebagai berikut:

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

Berikut ini adalah contoh pabrik prosesor rekaman di 3.0:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### Langkah 3: Migrasikan pekerja
<a name="step3-worker-migration"></a>

**Dalam versi 3.0 dari KCL, kelas baru, yang disebut **Scheduler**, menggantikan kelas Worker.** Berikut ini adalah contoh pekerja KCL 1.x:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**Untuk memigrasikan pekerja**

1. Ubah `import` pernyataan untuk `Worker` kelas ke pernyataan impor untuk `Scheduler` dan `ConfigsBuilder` kelas.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Impor `StreamTracker` dan ubah impor `StreamsWorkerFactory` ke`StreamsSchedulerFactory`.

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. Pilih posisi untuk memulai aplikasi. Bisa jadi `TRIM_HORIZON` atau`LATEST`.

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. Buat sebuah `StreamTracker` instance.

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. Buat `AmazonDynamoDBStreamsAdapterClient` objek.

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. Buat `ConfigsBuilder` objek.

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. Buat `Scheduler` menggunakan `ConfigsBuilder` seperti yang ditunjukkan pada contoh berikut:

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**penting**  
`CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X`Pengaturan mempertahankan kompatibilitas antara DynamoDB Streams Kinesis Adapter untuk KCL v3 dan KCL v1, bukan antara KCL v2 dan v3.

### Langkah 4: Ikhtisar dan rekomendasi konfigurasi KCL 3.x
<a name="step4-configuration-migration"></a>

[Untuk penjelasan rinci tentang konfigurasi yang diperkenalkan setelah KCL 1.x yang relevan di KCL 3.x lihat konfigurasi KCL dan [konfigurasi klien migrasi KCL](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html).](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration)

**penting**  
Alih-alih langsung membuat objek`checkpointConfig`,,,`coordinatorConfig`, `processorConfig` dan `leaseManagementConfig` `metricsConfig``retrievalConfig`, kami sarankan menggunakan `ConfigsBuilder` untuk mengatur konfigurasi di KCL 3.x dan versi yang lebih baru untuk menghindari masalah inisialisasi Scheduler. `ConfigsBuilder`menyediakan cara yang lebih fleksibel dan dapat dipelihara untuk mengkonfigurasi aplikasi KCL Anda.

#### Konfigurasi dengan nilai default pembaruan di KCL 3.x
<a name="kcl3-configuration-overview"></a>

`billingMode`  
Dalam KCL versi 1.x, nilai default untuk `billingMode` diatur ke. `PROVISIONED` Namun, dengan KCL versi 3.x, defaultnya `billingMode` adalah `PAY_PER_REQUEST` (mode on-demand). Kami menyarankan Anda menggunakan mode kapasitas sesuai permintaan untuk tabel sewa Anda untuk secara otomatis menyesuaikan kapasitas berdasarkan penggunaan Anda. Untuk panduan tentang penggunaan kapasitas yang disediakan untuk tabel sewa Anda, lihat [Praktik terbaik untuk tabel sewa dengan mode](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html) kapasitas yang disediakan.

`idleTimeBetweenReadsInMillis`  
Dalam KCL versi 1.x, nilai default untuk diatur ke `idleTimeBetweenReadsInMillis` adalah 1.000 (atau 1 detik). KCL versi 3.x menetapkan nilai default `dleTimeBetweenReadsInMillis` untuk i menjadi 1.500 (atau 1,5 detik), tetapi Amazon DynamoDB Streams Kinesis Adapter mengganti nilai default menjadi 1.000 (atau 1 detik).

#### Konfigurasi baru di KCL 3.x
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
Konfigurasi ini mendefinisikan interval waktu sebelum pecahan yang baru ditemukan mulai diproses, dan dihitung sebagai 1,5 ×. `leaseAssignmentIntervalMillis` Jika pengaturan ini tidak dikonfigurasi secara eksplisit, interval waktu default menjadi 1,5 ×. `failoverTimeMillis` Memproses pecahan baru melibatkan pemindaian tabel sewa dan menanyakan indeks sekunder global (GSI) pada tabel sewa. Menurunkan `leaseAssignmentIntervalMillis` peningkatan frekuensi operasi pemindaian dan kueri ini, menghasilkan biaya DynamoDB yang lebih tinggi. Kami merekomendasikan pengaturan nilai ini ke 2000 (atau 2 detik) untuk meminimalkan keterlambatan dalam memproses pecahan baru.

`shardConsumerDispatchPollIntervalMillis`  
Konfigurasi ini mendefinisikan interval antara jajak pendapat berturut-turut oleh konsumen shard untuk memicu transisi status. Di KCL versi 1.x, perilaku ini dikendalikan oleh `idleTimeInMillis` parameter, yang tidak diekspos sebagai pengaturan yang dapat dikonfigurasi. Dengan KCL versi 3.x, kami sarankan untuk mengatur konfigurasi ini agar sesuai dengan nilai yang digunakan ` idleTimeInMillis` dalam pengaturan KCL versi 1.x Anda.

### Langkah 5: Migrasi dari KCL 2.x ke KCL 3.x
<a name="step5-kcl2-to-kcl3"></a>

Untuk memastikan kelancaran transisi dan kompatibilitas dengan versi Kinesis Client Library (KCL) terbaru, ikuti langkah 5-8 dalam petunjuk panduan migrasi untuk [meningkatkan dari KCL 2.x ke KCL 3.x](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics).

[Untuk masalah pemecahan masalah umum KCL 3.x, lihat Memecahkan masalah aplikasi konsumen KCL.](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html)

# Putar kembali ke versi KCL sebelumnya
<a name="kcl-migration-rollback"></a>

Topik ini menjelaskan cara mengembalikan aplikasi konsumen Anda ke versi KCL sebelumnya. Proses roll-back terdiri dari dua langkah:

1. Jalankan [Alat Migrasi KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

1. Menerapkan ulang kode versi KCL sebelumnya.

## Langkah 1: Jalankan Alat Migrasi KCL
<a name="kcl-migration-rollback-step1"></a>

Ketika Anda perlu memutar kembali ke versi KCL sebelumnya, Anda harus menjalankan Alat Migrasi KCL. Alat ini melakukan dua tugas penting:
+ Ini menghapus tabel metadata yang disebut tabel metrik pekerja dan indeks sekunder global pada tabel sewa di DynamoDB. Artefak ini dibuat oleh KCL 3.x tetapi tidak diperlukan saat Anda memutar kembali ke versi sebelumnya.
+ Itu membuat semua pekerja berjalan dalam mode yang kompatibel dengan KCL 1.x dan mulai menggunakan algoritma load balancing yang digunakan dalam versi KCL sebelumnya. Jika Anda memiliki masalah dengan algoritme penyeimbangan beban baru di KCL 3.x, ini akan segera mengurangi masalah.

**penting**  
Tabel status koordinator di DynamoDB harus ada dan tidak boleh dihapus selama proses migrasi, rollback, dan rollforward.

**catatan**  
Sangat penting bahwa semua pekerja dalam aplikasi konsumen Anda menggunakan algoritma load balancing yang sama pada waktu tertentu. Alat Migrasi KCL memastikan bahwa semua pekerja di aplikasi konsumen KCL 3.x Anda beralih ke mode yang kompatibel dengan KCL 1.x sehingga semua pekerja menjalankan algoritma load balancing yang sama selama rollback aplikasi ke versi KCL sebelumnya.

Anda dapat mengunduh [Alat Migrasi KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) di direktori skrip repositori [ GitHubKCL](https://github.com/awslabs/amazon-kinesis-client/tree/master). Jalankan skrip dari pekerja atau host dengan izin yang sesuai untuk menulis ke tabel status koordinator, tabel metrik pekerja, dan tabel sewa. Pastikan [izin IAM](https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html) yang sesuai dikonfigurasi untuk aplikasi konsumen KCL. Jalankan skrip hanya sekali per aplikasi KCL menggunakan perintah yang ditentukan:

```
python3 ./KclMigrationTool.py --region region --mode rollback [--application_name applicationName] [--lease_table_name leaseTableName] [--coordinator_state_table_name coordinatorStateTableName] [--worker_metrics_table_name workerMetricsTableName]
```

### Parameter
<a name="kcl-migration-rollback-parameters"></a>

`--region`  
Ganti *region* dengan Anda Wilayah AWS.

`--application_name`  
Parameter ini diperlukan jika Anda menggunakan nama default untuk tabel metadata DynamoDB Anda (tabel sewa, tabel status koordinator, dan tabel metrik pekerja). Jika Anda telah menentukan nama kustom untuk tabel ini, Anda dapat menghilangkan parameter ini. Ganti *applicationName* dengan nama aplikasi KCL Anda yang sebenarnya. Alat ini menggunakan nama ini untuk mendapatkan nama tabel default jika nama kustom tidak disediakan.

`--lease_table_name`  
Parameter ini diperlukan ketika Anda telah menetapkan nama kustom untuk tabel sewa dalam konfigurasi KCL Anda. Jika Anda menggunakan nama tabel default, Anda dapat menghilangkan parameter ini. Ganti *leaseTableName* dengan nama tabel kustom yang Anda tentukan untuk tabel sewa Anda.

`--coordinator_state_table_name`  
Parameter ini diperlukan ketika Anda telah menetapkan nama kustom untuk tabel status koordinator dalam konfigurasi KCL Anda. Jika Anda menggunakan nama tabel default, Anda dapat menghilangkan parameter ini. Ganti *coordinatorStateTableName* dengan nama tabel kustom yang Anda tentukan untuk tabel status koordinator Anda.

`--worker_metrics_table_name`  
Parameter ini diperlukan ketika Anda telah menetapkan nama khusus untuk tabel metrik pekerja dalam konfigurasi KCL Anda. Jika Anda menggunakan nama tabel default, Anda dapat menghilangkan parameter ini. Ganti *workerMetricsTableName* dengan nama tabel kustom yang Anda tentukan untuk tabel metrik pekerja Anda.

## Langkah 2: Menerapkan ulang kode dengan versi KCL sebelumnya
<a name="kcl-migration-rollback-step2"></a>

**penting**  
Setiap penyebutan versi 2.x dalam output yang dihasilkan oleh Alat Migrasi KCL harus ditafsirkan sebagai mengacu pada KCL versi 1.x. Menjalankan skrip tidak melakukan rollback lengkap, itu hanya mengalihkan algoritma load balancing ke yang digunakan dalam KCL versi 1.x.

Setelah menjalankan KCL Migration Tool untuk rollback, Anda akan melihat salah satu pesan berikut:

Pesan 1  
“Rollback selesai. Aplikasi Anda menjalankan fungsionalitas yang kompatibel dengan 2x. Harap kembalikan ke binari aplikasi Anda sebelumnya dengan menerapkan kode dengan versi KCL Anda sebelumnya.”  
**Tindakan yang diperlukan:** Ini berarti pekerja Anda berjalan dalam mode kompatibel KCL 1.x. Menerapkan ulang kode dengan versi KCL sebelumnya ke pekerja Anda.

Pesan 2  
“Rollback selesai. Aplikasi KCL Anda menjalankan fungsionalitas 3x dan akan mengembalikan ke fungsionalitas yang kompatibel dengan 2x. Jika Anda tidak melihat mitigasi setelah periode waktu yang singkat, harap kembalikan ke binari aplikasi Anda sebelumnya dengan menerapkan kode dengan versi KCL Anda sebelumnya.  
**Tindakan yang diperlukan:** Ini berarti pekerja Anda berjalan dalam mode KCL 3.x dan Alat Migrasi KCL mengalihkan semua pekerja ke mode yang kompatibel dengan KCL 1.x. Menerapkan ulang kode dengan versi KCL sebelumnya ke pekerja Anda.

Pesan 3  
“Aplikasi sudah digulung kembali. KCLv3 Sumber daya apa pun yang dapat dihapus dibersihkan untuk menghindari biaya hingga aplikasi dapat digulirkan dengan migrasi.  
**Tindakan yang diperlukan:** Ini berarti bahwa pekerja Anda sudah diputar kembali untuk berjalan dalam mode kompatibel KCL 1.x. Menerapkan ulang kode dengan versi KCL sebelumnya ke pekerja Anda.

# Gulung maju ke KCL 3.x setelah rollback
<a name="kcl-migration-rollforward"></a>

Topik ini menjelaskan cara meneruskan aplikasi konsumen Anda ke KCL 3.x setelah rollback. Ketika Anda perlu maju, Anda harus menyelesaikan proses dua langkah:

1. Jalankan [Alat Migrasi KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

1. Terapkan kode dengan KCL 3.x.

## Langkah 1: Jalankan Alat Migrasi KCL
<a name="kcl-migration-rollforward-step1"></a>

Jalankan KCL Migration Tool dengan perintah berikut untuk maju ke KCL 3.x:

```
python3 ./KclMigrationTool.py --region region --mode rollforward [--application_name applicationName] [--coordinator_state_table_name coordinatorStateTableName]
```

### Parameter
<a name="kcl-migration-rollforward-parameters"></a>

`--region`  
Ganti *region* dengan Anda Wilayah AWS.

`--application_name`  
Parameter ini diperlukan jika Anda menggunakan nama default untuk tabel status koordinator Anda. Jika Anda telah menentukan nama kustom untuk tabel status koordinator, Anda dapat menghilangkan parameter ini. Ganti *applicationName* dengan nama aplikasi KCL Anda yang sebenarnya. Alat ini menggunakan nama ini untuk mendapatkan nama tabel default jika nama kustom tidak disediakan.

`--coordinator_state_table_name`  
Parameter ini diperlukan ketika Anda telah menetapkan nama kustom untuk tabel status koordinator dalam konfigurasi KCL Anda. Jika Anda menggunakan nama tabel default, Anda dapat menghilangkan parameter ini. Ganti *coordinatorStateTableName* dengan nama tabel kustom yang Anda tentukan untuk tabel status koordinator Anda.

Setelah Anda menjalankan alat migrasi dalam mode roll-forward, KCL membuat sumber daya DynamoDB berikut yang diperlukan untuk KCL 3.x:
+ Indeks Sekunder Global pada tabel sewa
+ Tabel metrik pekerja

## Langkah 2: Menyebarkan kode dengan KCL 3.x
<a name="kcl-migration-rollforward-step2"></a>

Setelah menjalankan Alat Migrasi KCL untuk maju, terapkan kode Anda dengan KCL 3.x ke pekerja Anda. Untuk menyelesaikan migrasi, lihat [Langkah 8: Selesaikan migrasi](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-finish).

# Panduan: Adaptor DynamoDB Streams Kinesis
<a name="Streams.KCLAdapter.Walkthrough"></a>

Bagian ini adalah panduan aplikasi Java yang menggunakan Perpustakaan Klien Amazon Kinesis dan Adaptor Amazon DynamoDB Streams Kinesis. Aplikasi ini memperlihatkan contoh replikasi data, di mana aktivitas penulisan dari satu tabel diterapkan ke tabel kedua, dengan konten kedua tabel tetap sinkron. Untuk kode sumber, lihat [Program lengkap: Adaptor DynamoDB Streams Kinesis](Streams.KCLAdapter.Walkthrough.CompleteProgram.md).

Program ini melakukan hal berikut:

1. Menciptakan dua tabel DynamoDB bernama `KCL-Demo-src` dan `KCL-Demo-dst`. Masing-masing tabel ini memiliki stream yang diaktifkan.

1. Menghasilkan aktivitas pembaruan dalam tabel sumber dengan menambahkan, memperbarui, dan menghapus item. Hal ini menyebabkan data akan ditulis ke stream tabel.

1. Membaca catatan dari stream, merekonstruksinya sebagai permintaan DynamoDB, dan menerapkan permintaan ke tabel tujuan.

1. Memindai tabel sumber dan tujuan untuk memastikan bahwa isinya identik.

1. Membersihkan dengan menghapus tabel.

Langkah-langkah ini dijelaskan di bagian berikut, dan aplikasi lengkap ditampilkan di akhir panduan.

**Topics**
+ [Langkah 1: Buat tabel DynamoDB](#Streams.KCLAdapter.Walkthrough.Step1)
+ [Langkah 2: Hasilkan aktivitas pembaruan di tabel sumber](#Streams.KCLAdapter.Walkthrough.Step2)
+ [Langkah 3: Proses alirannya](#Streams.KCLAdapter.Walkthrough.Step3)
+ [Langkah 4: Pastikan bahwa kedua tabel memiliki isi identik](#Streams.KCLAdapter.Walkthrough.Step4)
+ [Langkah 5: Bersihkan](#Streams.KCLAdapter.Walkthrough.Step5)
+ [Program lengkap: Adaptor DynamoDB Streams Kinesis](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## Langkah 1: Buat tabel DynamoDB
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

Langkah pertama adalah membuat dua tabel DynamoDB—tabel sumber dan tabel tujuan. `StreamViewType` pada aliran tabel sumber adalah `NEW_IMAGE`. Ini berarti bahwa setiap kali item diubah dalam tabel ini, gambar "setelah" item tersebut ditulis ke aliran. Dengan cara ini, aliran melacak semua aktivitas penulisan di tabel.

Contoh berikut menunjukkan kode yang digunakan untuk membuat kedua tabel.

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## Langkah 2: Hasilkan aktivitas pembaruan di tabel sumber
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

Langkah selanjutnya adalah menghasilkan beberapa aktivitas menulis pada tabel sumber. Saat aktivitas ini berlangsung, aliran tabel sumber juga diperbarui hampir secara waktu nyata.

Aplikasi ini mendefinisikan kelas pembantu dengan metode yang memanggil operasi `PutItem`, `UpdateItem`, dan API `DeleteItem` untuk menulis data. Contoh kode berikut menunjukkan bagaimana metode ini digunakan.

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## Langkah 3: Proses alirannya
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

Sekarang program mulai memproses aliran. Adaptor Kinesis DynamoDB Streams bertindak sebagai lapisan transparan antara KCL dan titik akhir DynamoDB Streams, sehingga kode dapat sepenuhnya menggunakan KCL daripada harus melakukan panggilan DynamoDB Streams tingkat rendah. Program ini melakukan tugas-tugas berikut:
+ Ini mendefinisikan kelas prosesor catatan, `StreamsRecordProcessor`, dengan metode yang sesuai dengan definisi antarmuka KCL: `initialize`, `processRecords`, dan `shutdown`. Metode `processRecords` berisi logika yang diperlukan untuk membaca dari stream tabel sumber dan menulis ke tabel tujuan.
+ Ini mendefinisikan sebuah pabrik kelas untuk kelas prosesor catatan (`StreamsRecordProcessorFactory`). Hal ini diperlukan untuk program Java yang menggunakan KCL.
+ Ini menginstanskan KCL `Worker` baru, yang terkait dengan pabrik kelas.
+ Ini mematikan `Worker` saat pemrosesan catatan selesai.

Secara opsional, aktifkan mode catch-up dalam konfigurasi Adaptor KCL Streams Anda untuk secara otomatis menskalakan laju panggilan GetRecords API sebesar 3x (default) saat kelambatan pemrosesan streaming melebihi satu menit (default), membantu konsumen streaming Anda menangani lonjakan throughput tinggi di tabel Anda.

Untuk mempelajari lebih lanjut tentang definisi antarmuka KCL, lihat [Mengembangkan konsumen menggunakan Kinesis client library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) di *Panduan Pengembang Amazon Kinesis Data Streams*. 

Contoh kode berikut menunjukkan loop utama dalam `StreamsRecordProcessor`. Pernyataan `case` menentukan apa tindakan apa yang harus dilakukan, berdasarkan `OperationType` yang muncul dalam catatan stream.

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## Langkah 4: Pastikan bahwa kedua tabel memiliki isi identik
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

Pada titik ini, isi sumber dan tujuan tabel tersinkronisasi. Aplikasi menerbitkan permintaan `Scan` terhadap kedua tabel untuk memverifikasi bahwa isinya, pada kenyataannya, identik.

Kelas `DemoHelper` berisi metode `ScanTable` yang memanggil API `Scan` tingkat rendah. Contoh berikut menunjukkan cara penggunaannya.

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## Langkah 5: Bersihkan
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

Demo selesai, sehingga aplikasi menghapus tabel sumber dan tujuan. Lihat contoh kode berikut. Bahkan setelah tabel dihapus, alirannya tetap tersedia hingga 24 jam, setelah itu tabel akan dihapus secara otomatis.

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```

# Program lengkap: Adaptor DynamoDB Streams Kinesis
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram"></a>

Berikut ini adalah program Java lengkap yang melakukan tugas yang dijelaskan dalam [Panduan: Adaptor DynamoDB Streams Kinesis](Streams.KCLAdapter.Walkthrough.md). Saat Anda menjalankannya, Anda akan melihat output yang serupa dengan yang seperti berikut.

```
Creating table KCL-Demo-src
Creating table KCL-Demo-dest
Table is active.
Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601
Starting worker...
Scan result is equal.
Done.
```

**penting**  
 Untuk menjalankan program ini, pastikan bahwa aplikasi klien memiliki akses ke DynamoDB dan CloudWatch Amazon menggunakan kebijakan. Untuk informasi selengkapnya, lihat [Kebijakan berbasis identitas untuk DynamoDB](security_iam_service-with-iam.md#security_iam_service-with-iam-id-based-policies). 

Kode sumber terdiri dari empat `.java` file. Untuk membangun program ini, tambahkan dependensi berikut, yang mencakup Amazon Kinesis Client Library (KCL) 3.x dan SDK AWS for Java v2 sebagai dependensi transitif:

------
#### [ Maven ]

```
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
    <version>2.1.0</version>
</dependency>
```

------
#### [ Gradle ]

```
implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:2.1.0'
```

------

File sumbernya adalah:
+ `StreamsAdapterDemo.java`
+ `StreamsRecordProcessor.java`
+ `StreamsRecordProcessorFactory.java`
+ `StreamsAdapterDemoHelper.java`

## StreamsAdapterDemo.jawa
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemo"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;

public class StreamsAdapterDemo {

    private static DynamoDbAsyncClient dynamoDbAsyncClient;
    private static CloudWatchAsyncClient cloudWatchAsyncClient;
    private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient;

    private static String tablePrefix = "KCL-Demo";
    private static String streamArn;

    private static Region region = Region.US_EAST_1;
    private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();

    public static void main( String[] args ) throws Exception {
        System.out.println("Starting demo...");
        dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        cloudWatchAsyncClient = CloudWatchAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region);

        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";

        setUpTables();

        StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn,
                InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));

        ShardRecordProcessorFactory shardRecordProcessorFactory =
                new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable);

        ConfigsBuilder configsBuilder = new ConfigsBuilder(
                streamTracker,
                "streams-adapter-demo",
                amazonDynamoDbStreamsAdapterClient,
                dynamoDbAsyncClient,
                cloudWatchAsyncClient,
                "streams-demo-worker",
                shardRecordProcessorFactory
        );

        DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient);
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        retrievalConfig.retrievalSpecificConfig(pollingConfig);

        System.out.println("Creating scheduler for stream " + streamArn);
        Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                retrievalConfig,
                amazonDynamoDbStreamsAdapterClient
        );

        System.out.println("Starting scheduler...");
        Thread t = new Thread(scheduler);
        t.start();

        Thread.sleep(250000);

        System.out.println("Stopping scheduler...");
        scheduler.shutdown();
        t.join();

        if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items()
                .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) {
            System.out.println("Scan result is equal.");
        } else {
            System.out.println("Tables are different!");
        }

        System.out.println("Done.");
        cleanupAndExit(0);
    }

    private static void setUpTables() {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable);
        StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable);

        awaitTableCreation(srcTable);

        performOps(srcTable);
    }

    private static void awaitTableCreation(String tableName) {
        Integer retries = 0;
        Boolean created = false;
        while (!created && retries < 100) {
            DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName);
            created = result.table().tableStatusAsString().equals("ACTIVE");
            if (created) {
                System.out.println("Table is active.");
                return;
            } else {
                retries++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
        }
        System.out.println("Timeout after table creation. Exiting...");
        cleanupAndExit(1);
    }

    private static void performOps(String tableName) {
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101");
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102");
    }

    private static void cleanupAndExit(Integer returnValue) {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build());
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build());
        System.exit(returnValue);
    }
}
```

## StreamsRecordProcessor.jawa
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessor"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    private Integer checkpointCounter;

    private final DynamoDbAsyncClient dynamoDbAsyncClient;
    private final String tableName;

    public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) {
        this.dynamoDbAsyncClient = dynamoDbAsyncClient;
        this.tableName = tableName;
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.checkpointCounter = 0;
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) {
        for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) {
            String data = new String(record.data().array(), StandardCharsets.UTF_8);
            System.out.println(data);
            Record streamRecord = record.getRecord();

            switch (streamRecord.eventName()) {
                case INSERT:
                case MODIFY:
                    StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().newImage());
                case REMOVE:
                    StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().keys().get("Id").n());
            }
            checkpointCounter += 1;
            if (checkpointCounter % 10 == 0) {
                try {
                    dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        System.out.println("Lease Lost");
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }
}
```

## StreamsRecordProcessorFactory.jawa
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessorFactory"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory {
    private final String tableName;
    private final DynamoDbAsyncClient dynamoDbAsyncClient;

    public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) {
        this.tableName = tableName;
        this.dynamoDbAsyncClient = asyncClient;
    }

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName);
    }
}
```

## StreamsAdapterDemoHelper.jawa
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemoHelper"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamsAdapterDemoHelper {

    /**
     * @return StreamArn
     */
    public static String createTable(DynamoDbAsyncClient client, String tableName) {
        List<AttributeDefinition> attributeDefinitions = new ArrayList<>();
        attributeDefinitions.add(AttributeDefinition.builder()
                .attributeName("Id")
                .attributeType("N")
                .build());

        List<KeySchemaElement> keySchema = new ArrayList<>();
        keySchema.add(KeySchemaElement.builder()
                .attributeName("Id")
                .keyType(KeyType.HASH) // Partition key
                .build());

        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType(StreamViewType.NEW_IMAGE)
                .build();

        CreateTableRequest createTableRequest = CreateTableRequest.builder()
                .tableName(tableName)
                .attributeDefinitions(attributeDefinitions)
                .keySchema(keySchema)
                .billingMode(BillingMode.PAY_PER_REQUEST)
                .streamSpecification(streamSpecification)
                .build();

        try {
            System.out.println("Creating table " + tableName);
            CreateTableResponse result = client.createTable(createTableRequest).join();
            return result.tableDescription().latestStreamArn();
        } catch (Exception e) {
            if (e.getCause() instanceof ResourceInUseException) {
                System.out.println("Table already exists.");
                return describeTable(client, tableName).table().latestStreamArn();
            }
            throw e;
        }
    }

    public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) {
        return client.describeTable(DescribeTableRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) {
        return dynamoDbClient.scan(ScanRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put("Id", AttributeValue.builder().n(id).build());
        item.put("attribute-1", AttributeValue.builder().s(val).build());

        putItem(dynamoDbClient, tableName, item);
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName,
                               Map<String, AttributeValue> items) {
        PutItemRequest putItemRequest = PutItemRequest.builder()
                .tableName(tableName)
                .item(items)
                .build();
        dynamoDbClient.putItem(putItemRequest).join();
    }

    public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        Map<String, String> expressionAttributeNames = new HashMap<>();
        expressionAttributeNames.put("#attr2", "attribute-2");

        Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
        expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build());

        UpdateItemRequest updateItemRequest = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .updateExpression("SET #attr2 = :val")
                .expressionAttributeNames(expressionAttributeNames)
                .expressionAttributeValues(expressionAttributeValues)
                .build();

        dynamoDbClient.updateItem(updateItemRequest).join();
    }

    public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .build();
        dynamoDbClient.deleteItem(deleteItemRequest).join();
    }
}
```