

# チュートリアル: DynamoDB Streams Kinesis Adapter
<a name="Streams.KCLAdapter.Walkthrough"></a>

このセクションは、Amazon Kinesis Client Library と Amazon DynamoDB Streams Kinesis Adapter を使用する Java アプリケーションのチュートリアルです。アプリケーションには、データレプリケーションの例が表示されます。データレプリケーションでは、1 つのテーブルからの書き込みアクティビティが 2 番目のテーブルに適用され、両方のテーブルの内容が同期されます。ソースコードについては、「[完成したプログラム: DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)」を参照してください。

このプログラムでは、次のような処理を実行します。

1. `KCL-Demo-src` と `KCL-Demo-dst` という 2 つの DynamoDB テーブルを作成します。これらの各テーブルでは、ストリームが有効になっています。

1. 項目を追加、更新、削除することで、ソーステーブルで更新アクティビティを生成します。これにより、データがテーブルのストリームに書き込まれます。

1. ストリーミングからレコードを読み込んで、DynamoDB リクエストとして再構築し、ターゲットテーブルにリクエストを適用します。

1. ソーステーブルとターゲットテーブルをスキャンし、内容が同じであることを確認します。

1. テーブルを削除してクリーンアップします。

これらのステップについては次のセクションで説明します。完成したアプリケーションは、チュートリアルの最後に示します。

**Topics**
+ [ステップ 1: DynamoDB テーブルを作成する](#Streams.KCLAdapter.Walkthrough.Step1)
+ [ステップ 2: ソーステーブルに更新アクティビティを生成する](#Streams.KCLAdapter.Walkthrough.Step2)
+ [ステップ 3: ストリームを処理する](#Streams.KCLAdapter.Walkthrough.Step3)
+ [ステップ 4: 両方のテーブルの内容が同じであることを確認する](#Streams.KCLAdapter.Walkthrough.Step4)
+ [ステップ 5：クリーンアップ](#Streams.KCLAdapter.Walkthrough.Step5)
+ [完成したプログラム: DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## ステップ 1: DynamoDB テーブルを作成する
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

最初のステップでは、2 つの DynamoDB テーブル (送信元テーブルと送信先テーブル) を作成します。ソーステーブルのストリームにある `StreamViewType` は `NEW_IMAGE` です。これは、このテーブルで項目が変更されると必ず、イメージの "後の" 項目がストリームに書き込まれることを意味します。このようにして、ストリームはテーブル内のすべての書き込みアクティビティを記録します。

次の例は、両方のテーブルを作成するためのコードを示しています。

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## ステップ 2: ソーステーブルに更新アクティビティを生成する
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

次のステップでは、ソーステーブルにいくつかの書き込みアクティビティを生成します。このアクティビティの実行中、ソーステーブルのストリームもほぼリアルタイムで更新されます。

アプリケーションは、データを書き込むための `PutItem`、`UpdateItem`、および `DeleteItem` API オペレーションを呼び出すメソッドを持つヘルパークラスを定義します。次の例は、これらのメソッドの使用方法を示しています。

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## ステップ 3: ストリームを処理する
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

ここでは、プログラムがストリームの処理を開始します。DynamoDB Streams Kinesis Adapter は、低レベルの DynamoDB Streams コールを行わなくてもコードが KCL を十分に使用できるように、KCL と DynamoDB Streams エンドポイントの間の透過的なレイヤーとして機能します。このプログラムでは次のタスクを実行しています。
+ KCL インターフェイス定義に従ったメソッド（`StreamsRecordProcessor`、`initialize`、`processRecords`）を使用して、レコードプロセッサクラス `shutdown` を定義します。`processRecords` メソッドには、ソーステーブルのストリームからの読み込みとターゲットテーブルへの書き込みに必要なロジックが含まれています。
+ レコードプロセッサクラスのクラスファクトリを定義します（`StreamsRecordProcessorFactory`）。これは、KCL を使用する Java プログラムに必要です。
+ クラスファクトリに関連付けられた新しい KCL `Worker` をインスタンス化します。
+ レコード処理が完了すると、`Worker` をシャットダウンします。

必要に応じて、Streams KCL Adapter 設定でキャッチアップモードを有効にして、ストリーム処理の遅延が 1 分 (デフォルト) を超えたときに GetRecords API 呼び出しレートを 3 倍 (デフォルト) に自動的にスケーリングし、ストリームコンシューマーがテーブル内の高スループットのスパイクを処理できるようにします。

KCL インターフェイス定義の詳細については、「*Amazon Kinesis Data Streams デベロッパーガイド*」の「[Kinesis Client Library を使用したコンシューマーの開発](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)」を参照してください。

次の例は、`StreamsRecordProcessor` におけるメインループを示しています。`case` ステートメントは、ストリームレコードに出現する `OperationType` に基づいて、実行するアクションを決定します。

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## ステップ 4: 両方のテーブルの内容が同じであることを確認する
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

この時点で、ソーステーブルとターゲットテーブルの内容が同期されています。アプリケーションは、両方のテーブルに対して `Scan` リクエストを発行し、内容が実際に同じであることを確認します。

`DemoHelper` クラスには、低レベルの `ScanTable` API を呼び出す `Scan` メソッドが含まれています。次の例は、その使用方法を示しています。

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## ステップ 5：クリーンアップ
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

デモは完了したため、アプリケーションによりソーステーブルとターゲットテーブルが削除されます。次のコード例を参照してください。テーブルが削除されても、そのストリームは最大 24 時間使用可能です。その後、自動的に削除されます。

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```