

# DynamoDB Streams Kinesis Adapter を使用したストリームレコードの処理
<a name="Streams.KCLAdapter"></a>

Amazon DynamoDB からのストリーミングを使用するには、Amazon Kinesis Adapter を使用することをお勧めします。DynamoDB Streams API は、意図的に Kinesis Data Streams と似せています。どちらのサービスでも、データストリーミングは、ストリーミングレコードのコンテナであるシャードで構成されています。両方のサービスの API には `ListStreams`、`DescribeStream`、`GetShards`、および `GetShardIterator` オペレーションが含まれています。(これらの DynamoDB Streams アクションは、Kinesis Data Streams の対応するアクションと似ていますが、100% 同一ではありません)。

DynamoDB Streams ユーザーは、KCL 内のデザインパターンを使用して DynamoDB Streams のシャードとストリーミングレコードを処理できます。これを行うには、DynamoDB Streams Kinesis Adapter を使用します。Kinesis Adapter は Kinesis Data Streams インターフェイスを実装しているため、KCL を使用して DynamoDB Streams からのレコードを消費および処理することができます。DynamoDB Streams Kinesis アダプターのセットアップとインストールの方法については、「[GitHub リポジトリ](https://github.com/awslabs/dynamodb-streams-kinesis-adapter)」を参照してください。

Kinesis Client Library (KCL) を使用して、Kinesis Data Streams のアプリケーションを書き込むことができます。KCL は、低レベルの Kinesis Data Streams API の上で役に立つ抽象化を提供することによりコーディングを簡素化します。KCL の詳細については、「[Amazon Kinesis Data Streams デベロッパーガイド](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)」の「*Kinesis Client Library を使用したコンシューマーの開発*」を参照してください。

DynamoDB では、AWS SDK for Java v2.x で KCL バージョン 3.x を使用することをお勧めします。現在の DynamoDB Streams Kinesis Adapter バージョン 1.x と AWS SDK for AWS SDK for Java v1.x は、「[AWS SDK とツールのメンテナンスポリシー](https://docs.aws.amazon.com/sdkref/latest/guide/maint-policy.html)」に従って、移行期間中、意図したとおりライフサイクルを通じて引き続き完全にサポートされます。

**注記**  
Amazon Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL 1.x は 2026 年 1 月 30 日にサポートが終了します。2026 年 1 月 30 日より前に、バージョン 1.x を使用して KCL アプリケーションを最新の KCL バージョンに移行することを強くお勧めします。最新の KCL バージョンを確認するには、GitHub の「[Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client)」ページを参照してください。最新の KCL バージョンの詳細については、「[Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/kcl.html)」を参照してください。KCL 1.x から KCL 3.x への移行については、「KCL 1.x から KCL 3.x への移行」を参照してください。

次の図表は、これらのライブラリがどのように連携するかを示しています。

![\[DynamoDB Streams レコードを処理するための DynamoDB Streams、Kinesis Data Streams、および KCL 間のインタラクション。\]](http://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/images/streams-kinesis-adapter.png)


DynamoDB Streams Kinesis Adapter を使用すれば、DynamoDB Streams エンドポイントにシームレスに誘導された API コールを使用して、KCL インターフェイスに対して開発を開始できます。

アプリケーションは起動時に KCL をコールしてワーカーをインスタンス化します。ワーカーに、アプリケーションの構成情報 (ストリーミング記述子や AWS 認証情報、および提供するレコードプロセッサクラスの名前など) を提供する必要があります。レコードプロセッサでコードを実行すると、ワーカーは次のタスクを実行します。
+ ストリームに接続する
+ ストリーミング内のシャードを列挙します。
+ ストリーム内の閉じた親シャードの子シャードをチェックして列挙します
+ シャードと他のワーカー (存在する場合) の関連付けを調整する
+ レコードプロセッサで管理する各シャードのレコードプロセッサをインスタンス化する
+ ストリーミングからレコードを取得します。
+ 高スループット時の GetRecords API 呼び出しレートをスケーリングします (キャッチアップモードが設定されている場合)
+ 対応するレコードプロセッサにレコードを送信する
+ 処理されたレコードのチェックポイントを作成する
+ ワーカーのインスタンス数が変化したときに、シャードとワーカーの関連付けを調整する
+ シャードが分割されたときに、シャードとワーカーの関連付けを調整します。

KCL アダプターは、一時的なスループットの増加を処理するための自動呼び出しレート調整機能であるキャッチアップモードをサポートしています。ストリーム処理の遅延が設定可能なしきい値 (デフォルトは 1 分) を超えると、キャッチアップモードでは GetRecords API の呼び出し頻度が設定可能な値 (デフォルトは 3 倍) でスケーリングされ、レコードの取得速度が上がり、遅延が減少すると通常の状態に戻ります。これは、デフォルトのポーリングレートを使用しているコンシューマーが DynamoDB 書き込みアクティビティに圧迫される可能性がある高スループット期間中に役立ちます。キャッチアップモードは、`catchupEnabled` 設定パラメータ (デフォルトは false) を使用して有効にできます。

**注記**  
こちらに記載されている KCL 概念の説明については、「*Amazon Kinesis Data Streams デベロッパーガイド*」の「[Kinesis Client Library を使用したコンシューマーの開発](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)」を参照してください。  
AWS Lambda でのストリームの使用の詳細については、「[DynamoDB Streams と AWS Lambda のトリガー](Streams.Lambda.md)」を参照してください。

# KCL 1.x から KCL 3.x への移行
<a name="streams-migrating-kcl"></a>

## 概要
<a name="migrating-kcl-overview"></a>

このガイドでは、コンシューマーアプリケーションを KCL 1.x から KCL 3.x に移行する手順について説明します。KCL 1.x と KCL 3.x のアーキテクチャの違いにより、移行では互換性を確保するために複数のコンポーネントを更新する必要があります。

KCL 1.x は、KCL 3.x とは異なるクラスとインターフェイスを使用します。まずレコードプロセッサ、レコードプロセッサファクトリ、ワーカークラスを KCL 3.x 互換形式に移行し、KCL 1.x から KCL 3.x への移行手順に従う必要があります。

## 移行手順
<a name="migration-steps"></a>

**Topics**
+ [ステップ 1: レコードプロセッサを移行する](#step1-record-processor)
+ [ステップ 2: レコードプロセッサファクトリーを移行する](#step2-record-processor-factory)
+ [ステップ 3: ワーカーを移行する](#step3-worker-migration)
+ [ステップ 4: KCL 3.x 設定の概要と推奨事項](#step4-configuration-migration)
+ [ステップ 5: KCL 2.x から KCL 3.x に移行する](#step5-kcl2-to-kcl3)

### ステップ 1: レコードプロセッサを移行する
<a name="step1-record-processor"></a>

以下の例は、KCL 1.x DynamoDB Streams Kinesis Adapter に実装されたレコードプロセッサを示しています。

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**RecordProcessor クラスを移行するには**

1. インターフェイスを `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` および `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` から `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor` に変更します。以下に例を示します。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. `initialize` メソッド `processRecords` とメソッドの import ステートメントを更新します。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. `shutdownRequested` メソッドを以下の新しいメソッドに置き換えます。`leaseLost`、`shardEnded`、および `shutdownRequested`。

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

以下に示しているのは、レコードプロセッサのクラスの更新されたバージョンです。

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**注記**  
DynamoDB Streams Kinesis Adapter は SDKv2 レコードモデルを使用するようになりました。SDKv2 では、複雑な `AttributeValue` オブジェクト (`BS`、`NS`、`M`、`L`、`SS`) が null を返すことはありません。`hasBs()`、`hasNs()`、`hasM()`、`hasL()`、`hasSs()` メソッドを使用して、これらの値が存在するかどうかを確認します。

### ステップ 2: レコードプロセッサファクトリーを移行する
<a name="step2-record-processor-factory"></a>

レコードプロセッサファクトリーは、リースが取得された際にレコードプロセッサの作成を担当します。以下に示しているのは、KCL 1.x ファクトリーの例です。

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**`RecordProcessorFactory` を移行するには**
+ 実装されているインターフェイスを `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` から `software.amazon.kinesis.processor.ShardRecordProcessorFactory` に変更します。以下に例を示します。

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

以下は、3.0 のレコードプロセッサファクトリーの例です。

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### ステップ 3: ワーカーを移行する
<a name="step3-worker-migration"></a>

バージョン 3.0 の KCL では、新しいクラス **Scheduler** によって **Worker** クラスが置き換えられます。KCL 1.x のワーカーの例を次に示します。

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**ワーカーを移行するには**

1. `import` クラスの `Worker` ステートメントを `Scheduler` クラスと `ConfigsBuilder` クラスのインポートステートメントに変更します。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. `StreamTracker` をインポートし、`StreamsWorkerFactory` のインポートを `StreamsSchedulerFactory` に変更します。

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. アプリケーションを起動するポジションを選択します。`TRIM_HORIZON`、`LATEST` のいずれかになります。

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. `StreamTracker` インスタンスを作成します。

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. `AmazonDynamoDBStreamsAdapterClient` オブジェクトを作成します。

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. `ConfigsBuilder` オブジェクトを作成します。

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. 次の例に示すように、`ConfigsBuilder` を使用して `Scheduler` を作成します。

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**重要**  
この `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` 設定は、KCL v2 と v3 ではなく、KCL v3 と KCL v1 の DynamoDB Streams Kinesis Adapter 間の互換性を維持します。

### ステップ 4: KCL 3.x 設定の概要と推奨事項
<a name="step4-configuration-migration"></a>

KCL 3.x に関連する KCL 1.x 後に導入された設定の詳細については、「[KCL 設定](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html)」と「[KCL 移行クライアント設定](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration)」を参照してください。

**重要**  
KCL 3.x 以降のバージョンでは、`checkpointConfig`、`coordinatorConfig`、`leaseManagementConfig`、`metricsConfig`、`processorConfig`、および `retrievalConfig` オブジェクトを直接作成する代わりに、スケジューラの初期化の問題を回避するために、`ConfigsBuilder` を使用して設定を行うことをお勧めします。`ConfigsBuilder` を使用すると、KCL アプリケーションをより柔軟で保守性の高い方法で構成できます。

#### KCL 3.x でデフォルト値を更新する設定
<a name="kcl3-configuration-overview"></a>

`billingMode`  
KCL バージョン 1.x では、`billingMode` のデフォルト値は `PROVISIONED` に設定されます。ただし、KCL バージョン 3.x では、デフォルトの `billingMode` は `PAY_PER_REQUEST` (オンデマンドモード) です。使用量に基づいて容量を自動的に調整するには、リーステーブルのオンデマンドキャパシティモードを使用することをお勧めします。リーステーブルにプロビジョンドキャパシティを使用するガイダンスについては、「[プロビジョンドキャパシティモードのリーステーブルのベストプラクティス](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html)」を参照してください。

`idleTimeBetweenReadsInMillis`  
KCL バージョン 1.x では、`idleTimeBetweenReadsInMillis` のデフォルト値は 1,000 (または 1 秒) に設定されています。KCL バージョン 3.x は i`dleTimeBetweenReadsInMillis` のデフォルト値を 1,500 (または 1.5 秒) に設定しますが、Amazon DynamoDB Streams Kinesis Adapter はデフォルト値を 1,000 (または 1 秒) に上書きします。

#### KCL 3.x の新しい設定
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
この設定では、新しく検出されたシャードが処理を開始するまでの時間間隔を定義し、1.5 × `leaseAssignmentIntervalMillis` として計算されます。この設定が明示的に設定されていない場合、時間間隔はデフォルトで 1.5 × `failoverTimeMillis` に設定されます。新しいシャードの処理には、リーステーブルをスキャンし、リーステーブルのグローバルセカンダリインデックス (GSI) をクエリする必要があります。`leaseAssignmentIntervalMillis` を小さくすると、これらのスキャンおよびクエリオペレーションの頻度が増加し、DynamoDB のコストが高くなります。新しいシャードの処理の遅延を最小限に抑えるために、この値を 2,000 (または 2 秒) に設定することをお勧めします。

`shardConsumerDispatchPollIntervalMillis`  
この設定では、シャードコンシューマーによる連続するポーリング間の間隔を定義して、状態遷移をトリガーします。KCL バージョン 1.x では、この動作は `idleTimeInMillis` パラメータによって制御され、設定可能な設定として公開されませんでした。KCL バージョン 3.x では、KCL バージョン 1.x のセットアップで ` idleTimeInMillis` で使用される値と一致するようにこの設定を行うことをお勧めします。

### ステップ 5: KCL 2.x から KCL 3.x に移行する
<a name="step5-kcl2-to-kcl3"></a>

最新の Kinesis Client Library (KCL) バージョンとのスムーズな移行と互換性を確保するには、[KCL 2.x から KCL 3.x へのアップグレード](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics)に関する移行ガイドの手順のステップ 5～8 に従います。

一般的な KCL 3.x のトラブルシューティングの問題については、「[Troubleshooting KCL consumer applications](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html)」を参照してください。

# 以前のバージョンにロールバックする
<a name="kcl-migration-rollback"></a>

このトピックでは、コンシューマーアプリケーションを以前の KCL バージョンにロールバックする方法について説明します。このロールバックプロセスは次の 2 つのステップで構成されています。

1. [KCL 移行ツール](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)を実行します。

1. 以前の KCL バージョンコードを再デプロイします。

## ステップ 1: KCL 移行ツールを実行する
<a name="kcl-migration-rollback-step1"></a>

以前の KCL バージョンにロールバックする必要がある場合は、KCL 移行ツールを実行する必要があります。このツールは 2 つの重要なタスクを実行します。
+ これにより、DynamoDB のワーカーメトリクステーブルと呼ばれるメタデータテーブルとリーステーブルのグローバルセカンダリインデックスが削除されます。これらのアーティファクトは KCL 3.x によって作成されますが、以前のバージョンにロールバックするときには必要ありません。
+ これにより、すべてのワーカーが KCL 1.x と互換性のあるモードで実行され、以前の KCL バージョンで使用されていた負荷分散アルゴリズムの使用が開始されます。KCL 3.x の新しい負荷分散アルゴリズムに問題がある場合、この問題はすぐに軽減されます。

**重要**  
DynamoDB のコーディネーター状態テーブルが存在し、移行、ロールバック、ロールフォワードプロセス中に削除されていない必要があります。

**注記**  
コンシューマーアプリケーションのすべてのワーカーが、一度に同じ負荷分散アルゴリズムを使用することが重要です。KCL 移行ツールを使用すると、KCL 3.x コンシューマーアプリケーション内のすべてのワーカーが KCL 1.x 互換モードに切り替えられ、アプリケーションが以前の KCL バージョンにロールバックしている間、すべてのワーカーが同じ負荷分散アルゴリズムを実行します。

[KCL 移行ツール](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)は、[KCL GitHub リポジトリ](https://github.com/awslabs/amazon-kinesis-client/tree/master)のスクリプトディレクトリでダウンロードできます。コーディネーター状態テーブル、ワーカーメトリクステーブル、リーステーブルに書き込むための適切なアクセス許可を持つワーカーまたはホストからスクリプトを実行します。KCL コンシューマーアプリケーションに適切な [IAM アクセス許可](https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html)が設定されていることを確認します。指定されたコマンドを使用して、KCL アプリケーションごとにスクリプトを 1 回だけ実行します。

```
python3 ./KclMigrationTool.py --region region --mode rollback [--application_name applicationName] [--lease_table_name leaseTableName] [--coordinator_state_table_name coordinatorStateTableName] [--worker_metrics_table_name workerMetricsTableName]
```

### パラメータ
<a name="kcl-migration-rollback-parameters"></a>

`--region`  
*リージョン*をお客様の AWS リージョンに置き換えてください。

`--application_name`  
このパラメータは、DynamoDB メタデータテーブル (リーステーブル、コーディネーター状態テーブル、ワーカーメトリクステーブル) にデフォルト名を使用している場合に必要です。これらのテーブルにカスタム名を指定している場合は、このパラメータを省略できます。*applicationName* を実際の KCL アプリケーションの名前に置き換えます。カスタム名が指定されていない場合、ツールはこの名前を使用してデフォルトのテーブル名を取得します。

`--lease_table_name`  
このパラメータは、KCL 設定でリーステーブルのカスタム名を設定している場合に必要です。デフォルトのテーブル名を使用している場合は、このパラメータを省略できます。*leaseTableName* をリーステーブルに指定したカスタムテーブル名に置き換えます。

`--coordinator_state_table_name`  
このパラメータは、KCL 設定でコーディネーター状態テーブルのカスタム名を設定している場合に必要です。デフォルトのテーブル名を使用している場合は、このパラメータを省略できます。*coordinatorStateTableName* を、コーディネーター状態テーブルに指定したカスタムテーブル名に置き換えます。

`--worker_metrics_table_name`  
このパラメータは、KCL 設定でワーカーメトリクステーブルのカスタム名を設定している場合に必要です。デフォルトのテーブル名を使用している場合は、このパラメータを省略できます。*workerMetricsTableName* を、ワーカーメトリクステーブルに指定したカスタムテーブル名に置き換えてください。

## ステップ 2: 以前の KCL バージョンでコードを再デプロイする
<a name="kcl-migration-rollback-step2"></a>

**重要**  
KCL 移行ツールによって生成された出力でバージョン 2.x に言及する場合は、KCL バージョン 1.x を参照していると解釈する必要があります。スクリプトを実行しても完全なロールバックは行われず、負荷分散アルゴリズムを KCL バージョン 1.x で使用されるものに切り替えるだけです。

ロールバック用に KCL 移行ツールを実行すると、次のいずれかのメッセージが表示されます。

メッセージ 1  
「ロールバックが完了しました。アプリケーションは 2x と互換性のある機能を実行していました。以前の KCL バージョンでコードをデプロイして、以前のアプリケーションバイナリにロールバックしてください。」  
**必要なアクション:** これは、ワーカーが KCL 1.x 互換モードで実行されていたことを意味します。以前の KCL バージョンのコードをワーカーに再デプロイしてください。

メッセージ 2  
「ロールバックが完了しました。KCL アプリケーションは 3x の機能を実行しており、2x と互換性のある機能にロールバックします。しばらくしても移行が見られない場合は、以前の KCL バージョンでコードをデプロイして、以前のアプリケーションバイナリにロールバックしてください。」  
**必要なアクション:** これはワーカーが KCL 3.x モードで実行され、KCL 移行ツールがすべてのワーカーを KCL 1.x 互換モードに切り替えたことを意味します。以前の KCL バージョンのコードをワーカーに再デプロイしてください。

メッセージ 3  
「アプリケーションは既にロールバックされています。削除できる KCLv3 リソースは、移行でアプリケーションをロールフォワードできるようになるまで料金が発生しないようにクリーンアップされました。」  
**必要なアクション:** これは、ワーカーが既にロールバックされ、KCL 1.x 互換モードで実行されていることを意味します。以前の KCL バージョンのコードをワーカーに再デプロイしてください。

# ロールバック後に KCL 3.x にロールフォワードする
<a name="kcl-migration-rollforward"></a>

このトピックでは、ロールバック後にコンシューマーアプリケーションを KCL 3.x にロールフォワードする方法について説明します。ロールフォワードが必要な場合は、2 ステップのプロセスを完了する必要があります。

1. [KCL 移行ツール](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)を実行します。

1. KCL 3.x を使用してコードをデプロイします。

## ステップ 1: KCL 移行ツールを実行する
<a name="kcl-migration-rollforward-step1"></a>

次のコマンドを使用して KCL 移行ツールを実行し、KCL 3.x にロールフォワードします。

```
python3 ./KclMigrationTool.py --region region --mode rollforward [--application_name applicationName] [--coordinator_state_table_name coordinatorStateTableName]
```

### パラメータ
<a name="kcl-migration-rollforward-parameters"></a>

`--region`  
*リージョン*をお客様の AWS リージョンに置き換えてください。

`--application_name`  
このパラメータは、コーディネーター状態テーブルにデフォルト名を使用している場合に必要です。コーディネーター状態テーブルにカスタム名を指定している場合は、このパラメータを省略できます。*applicationName* を実際の KCL アプリケーションの名前に置き換えます。カスタム名が指定されていない場合、ツールはこの名前を使用してデフォルトのテーブル名を取得します。

`--coordinator_state_table_name`  
このパラメータは、KCL 設定でコーディネーター状態テーブルのカスタム名を設定している場合に必要です。デフォルトのテーブル名を使用している場合は、このパラメータを省略できます。*coordinatorStateTableName* を、コーディネーター状態テーブルに指定したカスタムテーブル名に置き換えます。

移行ツールをロールフォワードモードで実行すると、KCL は KCL 3.x に必要な次の DynamoDB リソースを作成します。
+ リーステーブルのグローバルセカンダリインデックス
+ ワーカーメトリクステーブル

## ステップ 2: KCL 3.x を使用してコードをデプロイする
<a name="kcl-migration-rollforward-step2"></a>

ロールフォワードの KCL 移行ツールを実行したら、KCL 3.x を使用してコードをワーカーにデプロイします。移行を完了するには、「[ステップ 8: 移行を完了する](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-finish)」を参照してください。

# チュートリアル: DynamoDB Streams Kinesis Adapter
<a name="Streams.KCLAdapter.Walkthrough"></a>

このセクションは、Amazon Kinesis Client Library と Amazon DynamoDB Streams Kinesis Adapter を使用する Java アプリケーションのチュートリアルです。アプリケーションには、データレプリケーションの例が表示されます。データレプリケーションでは、1 つのテーブルからの書き込みアクティビティが 2 番目のテーブルに適用され、両方のテーブルの内容が同期されます。ソースコードについては、「[完成したプログラム: DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)」を参照してください。

このプログラムでは、次のような処理を実行します。

1. `KCL-Demo-src` と `KCL-Demo-dst` という 2 つの DynamoDB テーブルを作成します。これらの各テーブルでは、ストリームが有効になっています。

1. 項目を追加、更新、削除することで、ソーステーブルで更新アクティビティを生成します。これにより、データがテーブルのストリームに書き込まれます。

1. ストリーミングからレコードを読み込んで、DynamoDB リクエストとして再構築し、ターゲットテーブルにリクエストを適用します。

1. ソーステーブルとターゲットテーブルをスキャンし、内容が同じであることを確認します。

1. テーブルを削除してクリーンアップします。

これらのステップについては次のセクションで説明します。完成したアプリケーションは、チュートリアルの最後に示します。

**Topics**
+ [ステップ 1: DynamoDB テーブルを作成する](#Streams.KCLAdapter.Walkthrough.Step1)
+ [ステップ 2: ソーステーブルに更新アクティビティを生成する](#Streams.KCLAdapter.Walkthrough.Step2)
+ [ステップ 3: ストリームを処理する](#Streams.KCLAdapter.Walkthrough.Step3)
+ [ステップ 4: 両方のテーブルの内容が同じであることを確認する](#Streams.KCLAdapter.Walkthrough.Step4)
+ [ステップ 5：クリーンアップ](#Streams.KCLAdapter.Walkthrough.Step5)
+ [完成したプログラム: DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## ステップ 1: DynamoDB テーブルを作成する
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

最初のステップでは、2 つの DynamoDB テーブル (送信元テーブルと送信先テーブル) を作成します。ソーステーブルのストリームにある `StreamViewType` は `NEW_IMAGE` です。これは、このテーブルで項目が変更されると必ず、イメージの "後の" 項目がストリームに書き込まれることを意味します。このようにして、ストリームはテーブル内のすべての書き込みアクティビティを記録します。

次の例は、両方のテーブルを作成するためのコードを示しています。

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## ステップ 2: ソーステーブルに更新アクティビティを生成する
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

次のステップでは、ソーステーブルにいくつかの書き込みアクティビティを生成します。このアクティビティの実行中、ソーステーブルのストリームもほぼリアルタイムで更新されます。

アプリケーションは、データを書き込むための `PutItem`、`UpdateItem`、および `DeleteItem` API オペレーションを呼び出すメソッドを持つヘルパークラスを定義します。次の例は、これらのメソッドの使用方法を示しています。

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## ステップ 3: ストリームを処理する
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

ここでは、プログラムがストリームの処理を開始します。DynamoDB Streams Kinesis Adapter は、低レベルの DynamoDB Streams コールを行わなくてもコードが KCL を十分に使用できるように、KCL と DynamoDB Streams エンドポイントの間の透過的なレイヤーとして機能します。このプログラムでは次のタスクを実行しています。
+ KCL インターフェイス定義に従ったメソッド（`StreamsRecordProcessor`、`initialize`、`processRecords`）を使用して、レコードプロセッサクラス `shutdown` を定義します。`processRecords` メソッドには、ソーステーブルのストリームからの読み込みとターゲットテーブルへの書き込みに必要なロジックが含まれています。
+ レコードプロセッサクラスのクラスファクトリを定義します（`StreamsRecordProcessorFactory`）。これは、KCL を使用する Java プログラムに必要です。
+ クラスファクトリに関連付けられた新しい KCL `Worker` をインスタンス化します。
+ レコード処理が完了すると、`Worker` をシャットダウンします。

必要に応じて、Streams KCL Adapter 設定でキャッチアップモードを有効にして、ストリーム処理の遅延が 1 分 (デフォルト) を超えたときに GetRecords API 呼び出しレートを 3 倍 (デフォルト) に自動的にスケーリングし、ストリームコンシューマーがテーブル内の高スループットのスパイクを処理できるようにします。

KCL インターフェイス定義の詳細については、「*Amazon Kinesis Data Streams デベロッパーガイド*」の「[Kinesis Client Library を使用したコンシューマーの開発](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)」を参照してください。

次の例は、`StreamsRecordProcessor` におけるメインループを示しています。`case` ステートメントは、ストリームレコードに出現する `OperationType` に基づいて、実行するアクションを決定します。

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## ステップ 4: 両方のテーブルの内容が同じであることを確認する
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

この時点で、ソーステーブルとターゲットテーブルの内容が同期されています。アプリケーションは、両方のテーブルに対して `Scan` リクエストを発行し、内容が実際に同じであることを確認します。

`DemoHelper` クラスには、低レベルの `ScanTable` API を呼び出す `Scan` メソッドが含まれています。次の例は、その使用方法を示しています。

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## ステップ 5：クリーンアップ
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

デモは完了したため、アプリケーションによりソーステーブルとターゲットテーブルが削除されます。次のコード例を参照してください。テーブルが削除されても、そのストリームは最大 24 時間使用可能です。その後、自動的に削除されます。

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```

# 完成したプログラム: DynamoDB Streams Kinesis Adapter
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram"></a>

次に、[チュートリアル: DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.md) で説明したタスクを実行する、完成した Java プログラムを次に示します。実行すると、次のような出力が表示されます。

```
Creating table KCL-Demo-src
Creating table KCL-Demo-dest
Table is active.
Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601
Starting worker...
Scan result is equal.
Done.
```

**重要**  
 このプログラムを実行するには、クライアントアプリケーションがポリシーを使用して DynamoDB および Amazon CloudWatch にアクセスできることを確認します。詳細については、「[DynamoDB のアイデンティティベースのポリシー](security_iam_service-with-iam.md#security_iam_service-with-iam-id-based-policies)」を参照してください。

ソースコードは、4 つの `.java` ファイルから構成されています。このプログラムを構築するには、Amazon Kinesis Client Library (KCL) 3.x と AWS SDK for Java v2 を含む次の依存関係を推移的な依存関係として追加します。

------
#### [ Maven ]

```
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
    <version>2.1.0</version>
</dependency>
```

------
#### [ Gradle ]

```
implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:2.1.0'
```

------

ソースファイルは次のとおりです。
+ `StreamsAdapterDemo.java`
+ `StreamsRecordProcessor.java`
+ `StreamsRecordProcessorFactory.java`
+ `StreamsAdapterDemoHelper.java`

## StreamsAdapterDemo.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemo"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;

public class StreamsAdapterDemo {

    private static DynamoDbAsyncClient dynamoDbAsyncClient;
    private static CloudWatchAsyncClient cloudWatchAsyncClient;
    private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient;

    private static String tablePrefix = "KCL-Demo";
    private static String streamArn;

    private static Region region = Region.US_EAST_1;
    private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();

    public static void main( String[] args ) throws Exception {
        System.out.println("Starting demo...");
        dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        cloudWatchAsyncClient = CloudWatchAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region);

        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";

        setUpTables();

        StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn,
                InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));

        ShardRecordProcessorFactory shardRecordProcessorFactory =
                new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable);

        ConfigsBuilder configsBuilder = new ConfigsBuilder(
                streamTracker,
                "streams-adapter-demo",
                amazonDynamoDbStreamsAdapterClient,
                dynamoDbAsyncClient,
                cloudWatchAsyncClient,
                "streams-demo-worker",
                shardRecordProcessorFactory
        );

        DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient);
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        retrievalConfig.retrievalSpecificConfig(pollingConfig);

        System.out.println("Creating scheduler for stream " + streamArn);
        Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                retrievalConfig,
                amazonDynamoDbStreamsAdapterClient
        );

        System.out.println("Starting scheduler...");
        Thread t = new Thread(scheduler);
        t.start();

        Thread.sleep(250000);

        System.out.println("Stopping scheduler...");
        scheduler.shutdown();
        t.join();

        if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items()
                .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) {
            System.out.println("Scan result is equal.");
        } else {
            System.out.println("Tables are different!");
        }

        System.out.println("Done.");
        cleanupAndExit(0);
    }

    private static void setUpTables() {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable);
        StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable);

        awaitTableCreation(srcTable);

        performOps(srcTable);
    }

    private static void awaitTableCreation(String tableName) {
        Integer retries = 0;
        Boolean created = false;
        while (!created && retries < 100) {
            DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName);
            created = result.table().tableStatusAsString().equals("ACTIVE");
            if (created) {
                System.out.println("Table is active.");
                return;
            } else {
                retries++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
        }
        System.out.println("Timeout after table creation. Exiting...");
        cleanupAndExit(1);
    }

    private static void performOps(String tableName) {
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101");
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102");
    }

    private static void cleanupAndExit(Integer returnValue) {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build());
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build());
        System.exit(returnValue);
    }
}
```

## StreamsRecordProcessor.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessor"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    private Integer checkpointCounter;

    private final DynamoDbAsyncClient dynamoDbAsyncClient;
    private final String tableName;

    public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) {
        this.dynamoDbAsyncClient = dynamoDbAsyncClient;
        this.tableName = tableName;
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.checkpointCounter = 0;
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) {
        for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) {
            String data = new String(record.data().array(), StandardCharsets.UTF_8);
            System.out.println(data);
            Record streamRecord = record.getRecord();

            switch (streamRecord.eventName()) {
                case INSERT:
                case MODIFY:
                    StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().newImage());
                case REMOVE:
                    StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().keys().get("Id").n());
            }
            checkpointCounter += 1;
            if (checkpointCounter % 10 == 0) {
                try {
                    dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        System.out.println("Lease Lost");
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }
}
```

## StreamsRecordProcessorFactory.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessorFactory"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory {
    private final String tableName;
    private final DynamoDbAsyncClient dynamoDbAsyncClient;

    public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) {
        this.tableName = tableName;
        this.dynamoDbAsyncClient = asyncClient;
    }

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName);
    }
}
```

## StreamsAdapterDemoHelper.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemoHelper"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamsAdapterDemoHelper {

    /**
     * @return StreamArn
     */
    public static String createTable(DynamoDbAsyncClient client, String tableName) {
        List<AttributeDefinition> attributeDefinitions = new ArrayList<>();
        attributeDefinitions.add(AttributeDefinition.builder()
                .attributeName("Id")
                .attributeType("N")
                .build());

        List<KeySchemaElement> keySchema = new ArrayList<>();
        keySchema.add(KeySchemaElement.builder()
                .attributeName("Id")
                .keyType(KeyType.HASH) // Partition key
                .build());

        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType(StreamViewType.NEW_IMAGE)
                .build();

        CreateTableRequest createTableRequest = CreateTableRequest.builder()
                .tableName(tableName)
                .attributeDefinitions(attributeDefinitions)
                .keySchema(keySchema)
                .billingMode(BillingMode.PAY_PER_REQUEST)
                .streamSpecification(streamSpecification)
                .build();

        try {
            System.out.println("Creating table " + tableName);
            CreateTableResponse result = client.createTable(createTableRequest).join();
            return result.tableDescription().latestStreamArn();
        } catch (Exception e) {
            if (e.getCause() instanceof ResourceInUseException) {
                System.out.println("Table already exists.");
                return describeTable(client, tableName).table().latestStreamArn();
            }
            throw e;
        }
    }

    public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) {
        return client.describeTable(DescribeTableRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) {
        return dynamoDbClient.scan(ScanRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put("Id", AttributeValue.builder().n(id).build());
        item.put("attribute-1", AttributeValue.builder().s(val).build());

        putItem(dynamoDbClient, tableName, item);
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName,
                               Map<String, AttributeValue> items) {
        PutItemRequest putItemRequest = PutItemRequest.builder()
                .tableName(tableName)
                .item(items)
                .build();
        dynamoDbClient.putItem(putItemRequest).join();
    }

    public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        Map<String, String> expressionAttributeNames = new HashMap<>();
        expressionAttributeNames.put("#attr2", "attribute-2");

        Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
        expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build());

        UpdateItemRequest updateItemRequest = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .updateExpression("SET #attr2 = :val")
                .expressionAttributeNames(expressionAttributeNames)
                .expressionAttributeValues(expressionAttributeValues)
                .build();

        dynamoDbClient.updateItem(updateItemRequest).join();
    }

    public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .build();
        dynamoDbClient.deleteItem(deleteItemRequest).join();
    }
}
```