

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 逐步解說：DynamoDB Streams Kinesis 轉接器
<a name="Streams.KCLAdapter.Walkthrough"></a>

本節會逐步解說使用 Amazon Kinesis Client Library 與 Amazon DynamoDB Streams Kinesis 轉接器的 Java 應用程式。此應用程式示範資料複寫的範例，將一份資料表的寫入活動套用至第二份資料表，讓兩份資料表的內容保持同步。如需來源碼，請參閱「[完整程式：DynamoDB Streams Kinesis 轉接器](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)」。

此程式執行下列操作：

1. 建立兩個 DynamoDB 資料表，並命名為 `KCL-Demo-src` 和 `KCL-Demo-dst`。這些資料表各會啟用串流。

1. 在來源資料表中新增、更新與刪除項目，以產生更新活動。這會導致將資料寫入資料表的串流。

1. 從串流讀取紀錄、將其重新建構為 DynamoDB 請求，再將請求套用至目標資料表。

1. 掃描來源與目標資料表，以確定其內容相同。

1. 刪除資料表以清除。

下列章節將說明這些步驟，並在演練結尾顯示完整的應用程式。

**Topics**
+ [步驟 1：建立 DynamoDB 資料表](#Streams.KCLAdapter.Walkthrough.Step1)
+ [步驟 2：在來源資料表中產生更新活動](#Streams.KCLAdapter.Walkthrough.Step2)
+ [步驟 3：處理串流](#Streams.KCLAdapter.Walkthrough.Step3)
+ [步驟 4：確定這兩個資料表的內容相同](#Streams.KCLAdapter.Walkthrough.Step4)
+ [步驟 5：清除](#Streams.KCLAdapter.Walkthrough.Step5)
+ [完整程式：DynamoDB Streams Kinesis 轉接器](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## 步驟 1：建立 DynamoDB 資料表
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

第一步是建立兩個 DynamoDB 資料表：一個來源資料表與一個目標資料表。來源資料表串流中的 `StreamViewType` 是 `NEW_IMAGE`。這表示每當在此資料表中修改項目時，項目的「修改後」影像就會寫入串流。串流可透過此方法追蹤資料表上的所有寫入活動。

下列範例顯示用來建立這兩份資料表的程式碼。

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## 步驟 2：在來源資料表中產生更新活動
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

下一步是在來源資料表上產生一些寫入活動。在此活動進行期間，來源資料表的串流也會近乎即時地更新。

此應用程式會使用呼叫 `PutItem`、`UpdateItem` 與 `DeleteItem` API 操作的方法，來定義小幫手類別，以寫入資料。下列程式碼範例示範如何使用這些方法。

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## 步驟 3：處理串流
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

程式現在會開始處理串流。DynamoDB Streams Kinesis 轉接器會作為 KCL 與 DynamoDB Streams 端點之間的透明層，讓程式碼可以充分利用 KCL，而不需要發出低階 DynamoDB Streams 呼叫。此程式會執行下列任務：
+ 它會以遵守 KCL 介面定義的方法 (`StreamsRecordProcessor`、`initialize` 與 `processRecords`) 來定義紀錄處理器類別 `shutdown`。`processRecords` 方法包含從來源資料表串流讀取與寫入目標資料表所需的邏輯。
+ 它會定義紀錄處理器類別的類別處理站 (`StreamsRecordProcessorFactory`)。這是使用 KCL 之 Java 程式的必要任務。
+ 它會執行個體化與類別處理站相關聯的新 KCL `Worker`。
+ 它會在紀錄處理完成時關閉 `Worker`。

或者，在 Streams KCL Adapter 組態中啟用追趕模式，以便在串流處理延遲超過一分鐘 （預設） 時自動將 GetRecords API 呼叫速率擴展 3 倍 （預設），協助您的串流取用者處理資料表中的高輸送量峰值。

若要進一步了解 KCL 介面定義，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[使用 Kinesis Client Library 開發取用者](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)。

下列程式碼範例顯示 `StreamsRecordProcessor` 中的主迴圈。`case` 陳述式會根據串流紀錄中顯示的 `OperationType` 來決定要執行的動作。

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## 步驟 4：確定這兩個資料表的內容相同
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

此時，來源與目標資料表的內容會同步。此應用程式會對這兩個資料表發出 `Scan` 請求，以確認其內容事實上相同。

`DemoHelper` 類別包含呼叫低階 `Scan` API 的 `ScanTable` 方法。下列範例示範其使用方法。

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## 步驟 5：清除
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

示範已完成，因此應用程式會刪除來源與目標資料表。請參閱以下程式碼範例。即使在刪除資料表後，其串流也會保持可用長達 24 小時，然後就自動刪除。

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```