

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# Amazon DynamoDB 專用 Kinesis Data Streams 入門
<a name="kds_gettingstarted"></a>

本節說明如何搭配 Amazon DynamoDB 主控台、 AWS Command Line Interface (AWS CLI) 和 API 使用 Amazon DynamoDB 資料表的 Kinesis Data Streams。

## 建立作用中的 Amazon Kinesis 資料串流
<a name="kds_gettingstarted.making-changes"></a>

所有這些範例都使用 `Music` DynamoDB 資料表，該資料表是在 [DynamoDB 入門](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GettingStartedDynamoDB.html)教學課程中建立的。

若要進一步了解如何建置取用者並將 Kinesis 資料串流連線至其他 AWS 服務，請參閱《Amazon Kinesis Data Streams 開發人員指南》**中的[從 Kinesis Data Streams 讀取資料](https://docs.aws.amazon.com/streams/latest/dev/building-consumers.html)。

**注意**  
 第一次使用 KDS 碎片時，建議您將碎片設定為隨著使用模式向上擴展和縮減。累積更多使用模式的相關資料後，您可以調整串流中的碎片以進行配對。

------
#### [ Console ]

1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/) 的 Kinesis 主控台。

1. 選擇 **Create data stream** (建立資料串流)，並依照說明來建立名為 `samplestream` 的串流。

1. 請在 [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/) 開啟 DynamoDB 主控台。

1. 在主控台左側的導覽窗格中，選擇 **Tables** (資料表)。

1. 選擇 **Music** (音樂) 資料表。

1. 選擇 **Exports and streams** (匯出與串流) 索引標籤。

1. (選用) 您可以在 **Amazon Kinesis 資料串流詳細資訊**下，將記錄時間戳記精確度從微秒 (預設) 變更為毫秒。

1. 從下拉式清單選擇 **samplestream** (範例串流)。

1. 選擇 **Turn On** (開啟) 按鈕。

------
#### [ AWS CLI ]

1. 使用 [create-stream 命令](https://docs.aws.amazon.com/cli/latest/reference/kinesis/create-stream.html)建立名為 `samplestream` 的 Kinesis 資料串流。

   ```
   aws kinesis create-stream --stream-name samplestream --shard-count 3 
   ```

   請先參閱 [Kinesis Data Streams 的碎片管理考量事項](kds_using-shards-and-metrics.md#kds_using-shards-and-metrics.shardmanagment)，再為 Kinesis 資料串流設定碎片數量。

1. 使用 [describe-stream 命令](https://docs.aws.amazon.com/cli/latest/reference/kinesis/describe-stream.html)確認 Kinesis 串流是否處於作用中狀態且可供使用。

   ```
   aws kinesis describe-stream --stream-name samplestream
   ```

1. 使用 DynamoDB `enable-kinesis-streaming-destination` 命令在 DynamoDB 資料表上啟用 Kinesis 串流功能。將 `stream-arn` 值替換為上一個步驟中傳回的值 `describe-stream`。可選擇性啟用時間戳記值精確度更精細 (微秒級) 的串流，傳回至每筆記錄上。

   啟用微秒級時間戳記精確度串流：

   ```
   aws dynamodb enable-kinesis-streaming-destination \
     --table-name Music \
     --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
     --enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
   ```

   或啟用預設時間戳記精確度串流 (毫秒級)：

   ```
   aws dynamodb enable-kinesis-streaming-destination \
     --table-name Music \
     --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
   ```

1. 使用 DynamoDB `describe-kinesis-streaming-destination` 命令確認資料表上的 Kinesis 串流是否處於作用中狀態。

   ```
   aws dynamodb describe-kinesis-streaming-destination --table-name Music
   ```

1. 使用 `put-item` 命令將資料寫入 DynamoDB 資料表，如《[DynamoDB 開發人員指南](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-2.html)》中所述。

   ```
   aws dynamodb put-item \
       --table-name Music  \
       --item \
           '{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}'
   
   aws dynamodb put-item \
       --table-name Music \
       --item \
           '{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
   ```

1. 使用 Kinesis [get-records](https://docs.aws.amazon.com/cli/latest/reference/kinesis/get-records.html) CLI 命令來擷取 Kinesis 串流內容。然後使用下面的程式碼片段來將串流內容還原序列化。

   ```
   /**
    * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
    */
   public void processRecord(Record kinesisRecord) throws IOException {
       ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
       JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
       JsonNode dynamoDBRecord = rootNode.get("dynamodb");
       JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
       JsonNode newItemImage = dynamoDBRecord.get("NewImage");
       Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
   
       /**
        * Say for example our record contains a String attribute named "stringName" and we want to fetch the value
        * of this attribute from the new item image. The following code fetches this value.
        */
       JsonNode attributeNode = newItemImage.get("stringName");
       JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
       String attributeValue = attributeValueNode.textValue();
       System.out.println(attributeValue);
   }
   
   private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
       JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
       JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
       if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
           return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
       }
       return Instant.ofEpochMilli(timestampJson.longValue());
   }
   ```

------
#### [ Java ]

1. 遵循《Kinesis Data Streams 開發人員指南》中的說明，使用 Java [建立](https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-create-stream.html)名為 `samplestream` 的 Kinesis 資料串流。

   請先參閱 [Kinesis Data Streams 的碎片管理考量事項](kds_using-shards-and-metrics.md#kds_using-shards-and-metrics.shardmanagment)，再為 Kinesis 資料串流設定碎片數量。

1. 使用以下程式碼片段來啟用 DynamoDB 資料表上的 Kinesis 串流。可選擇性啟用時間戳記值精確度更精細 (微秒級) 的串流，傳回至每筆記錄上。

   啟用微秒級時間戳記精確度串流：

   ```
   EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder()
     .approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND)
     .build();
   
   EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
     .tableName(tableName)
     .streamArn(kdsArn)
     .enableKinesisStreamingConfiguration(enableKdsConfig)
     .build();
   
   EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
   ```

   或啟用預設時間戳記精確度串流 (毫秒級)：

   ```
   EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
     .tableName(tableName)
     .streamArn(kdsArn)
     .build();
   
   EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
   ```

1. 遵循《Kinesis Data Streams 開發人員指南》中的說明，從建立的資料串流中進行[讀取](https://docs.aws.amazon.com/streams/latest/dev/building-consumers.html)。

1. 然後使用下面的程式碼片段將串流內容還原序列化

   ```
   /**
    * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
    */
   public void processRecord(Record kinesisRecord) throws IOException {
       ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
       JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
       JsonNode dynamoDBRecord = rootNode.get("dynamodb");
       JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
       JsonNode newItemImage = dynamoDBRecord.get("NewImage");
       Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
   
       /**
        * Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value
        * of this attribute from the new item image, the below code would fetch this.
        */
       JsonNode attributeNode = newItemImage.get("stringName");
       JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
       String attributeValue = attributeValueNode.textValue();
       System.out.println(attributeValue);
   }
   
   private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
       JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
       JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
       if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
           return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
       }
       return Instant.ofEpochMilli(timestampJson.longValue());
   }
   ```

------

## 變更作用中的 Amazon Kinesis 資料串流
<a name="kds_gettingstarted.making-changes"></a>

本節說明如何使用 主控台 AWS CLI 和 API 來變更作用中的 Kinesis Data Streams for DynamoDB 設定。

**AWS 管理主控台**

1. 請在 [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/) 開啟 DynamoDB 主控台。

1. 前往您的資料表。

1. 選擇**匯出與串流**。

**AWS CLI**

1. 呼叫 `describe-kinesis-streaming-destination` 以確認串流為 `ACTIVE`。

1. 呼叫 `UpdateKinesisStreamingDestination`，例如在此範例中：

   ```
   aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
   ```

1. 呼叫 `describe-kinesis-streaming-destination` 以確認串流為 `UPDATING`。

1. 定期呼叫 `describe-kinesis-streaming-destination`，直到串流狀態再次變成 `ACTIVE` 為止。時間戳記精確度更新通常需要 5 分鐘才會生效。一旦此狀態更新，表示更新已完成，且新的精確度值將套用至未來的記錄。

1. 使用 `putItem` 寫入資料表。

1. 使用 Kinesis `get-records` 命令來取得串流內容。

1. 確認寫入的 `ApproximateCreationDateTime` 具有所需的精確度。

**Java API**

1. 提供可建構 `UpdateKinesisStreamingDestination` 請求和 `UpdateKinesisStreamingDestination` 回應的程式碼片段。

1. 提供可建構 `DescribeKinesisStreamingDestination` 請求和 `DescribeKinesisStreamingDestination response` 回應的程式碼片段。

1. 定期呼叫 `describe-kinesis-streaming-destination`，直到串流狀態再次變成 `ACTIVE`，表示更新已完成，且新的精確度值將套用至未來的記錄。

1. 執行對資料表的寫入。

1.  從串流讀取並還原序列化串流內容。

1. 確認寫入的 `ApproximateCreationDateTime` 具有所需的精確度。