

# 适用于 Amazon DynamoDB 的 Kinesis Data Streams 入门
<a name="kds_gettingstarted"></a>

本节介绍了如何通过 Amazon DynamoDB 控制台、AWS Command Line Interface（AWS CLI）和 API 将 Kinesis Data Streams 用于 Amazon DynamoDB 表。

## 创建有效的 Amazon Kinesis 数据流
<a name="kds_gettingstarted.making-changes"></a>

所有这些示例都使用 [DynamoDB 入门](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GettingStartedDynamoDB.html)教程中创建的 `Music` DynamoDB 表。

要了解如何构建使用器并将 Kinesis Data Stream 连接到其他 AWS 服务，请参阅《Amazon Kinesis Data Streams 开发人员指南》**中的[从 Kinesis Data Streams 读取数据](https://docs.aws.amazon.com/streams/latest/dev/building-consumers.html)。

**注意**  
 当您首次使用 KDS 分片时，建议您将分片设置为根据使用模式横向和纵向扩展。在积累了更多有关使用模式的数据后，您可以调整数据流中的分片以与模式匹配。

------
#### [ Console ]

1. 登录到 AWS 管理控制台，然后打开 Kinesis 控制台：[https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/)。

1. 选择**创建数据流**并按照说明创建一个名为 `samplestream` 的流。

1. 打开 DynamoDB 控制台：[https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)。

1. 在控制台左侧的导航窗格中，选择**表**。

1. 选择 **Music** 表。

1. 选择**导出和流式传输**选项卡。

1. （可选）在 **Amazon Kinesis Data Streams 详细信息**下，您可以将记录时间戳精度从微秒（默认）更改为毫秒。

1. 从下拉列表选择 **samplestream**。

1. 选择**开启**按钮。

------
#### [ AWS CLI ]

1. 使用 [create-stream 命令](https://docs.aws.amazon.com/cli/latest/reference/kinesis/create-stream.html)创建名为 `samplestream` 的 Kinesis 流。

   ```
   aws kinesis create-stream --stream-name samplestream --shard-count 3 
   ```

   请参阅[Kinesis Data Streams 的分片管理注意事项](kds_using-shards-and-metrics.md#kds_using-shards-and-metrics.shardmanagment)，然后设置 Kinesis 数据流的分片数。

1. 使用 [describe-stream 命令](https://docs.aws.amazon.com/cli/latest/reference/kinesis/describe-stream.html)，检查 Kinesis 流是否处于活动状态并准备好使用。

   ```
   aws kinesis describe-stream --stream-name samplestream
   ```

1. 使用 DynamoDB `enable-kinesis-streaming-destination` 命令，在 DynamoDB 表上启用 Kinesis 数据流。将 `stream-arn` 值替换为上一步返回的 `describe-stream`。（可选）启用在每条记录上返回更精细（微秒）精度时间戳值的流式传输。

   启用微秒时间戳精度的流式传输：

   ```
   aws dynamodb enable-kinesis-streaming-destination \
     --table-name Music \
     --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
     --enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
   ```

   或者启用默认时间戳精度（毫秒）的流式传输：

   ```
   aws dynamodb enable-kinesis-streaming-destination \
     --table-name Music \
     --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
   ```

1. 使用 DynamoDB `describe-kinesis-streaming-destination` 命令检查是否在表上激活 Kinesis 流。

   ```
   aws dynamodb describe-kinesis-streaming-destination --table-name Music
   ```

1. 使用 [DynamoDB 开发人员指南](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-2.html)介绍的 `put-item` 命令，将数据写入 DynamoDB 表。

   ```
   aws dynamodb put-item \
       --table-name Music  \
       --item \
           '{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}'
   
   aws dynamodb put-item \
       --table-name Music \
       --item \
           '{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
   ```

1. 使用 Kinesis [get-records](https://docs.aws.amazon.com/cli/latest/reference/kinesis/get-records.html) CLI 命令检索 Kinesis 流内容。然后使用以下代码片段来反序列化流内容。

   ```
   /**
    * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
    */
   public void processRecord(Record kinesisRecord) throws IOException {
       ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
       JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
       JsonNode dynamoDBRecord = rootNode.get("dynamodb");
       JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
       JsonNode newItemImage = dynamoDBRecord.get("NewImage");
       Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
   
       /**
        * Say for example our record contains a String attribute named "stringName" and we want to fetch the value
        * of this attribute from the new item image. The following code fetches this value.
        */
       JsonNode attributeNode = newItemImage.get("stringName");
       JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
       String attributeValue = attributeValueNode.textValue();
       System.out.println(attributeValue);
   }
   
   private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
       JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
       JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
       if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
           return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
       }
       return Instant.ofEpochMilli(timestampJson.longValue());
   }
   ```

------
#### [ Java ]

1. 按照 Kinesis Data Streams 开发人员指南中的说明，使用 Java 创建一个名为 `samplestream` 的[https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-create-stream.html](https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-create-stream.html) Kinesis 数据流。

   请参阅[Kinesis Data Streams 的分片管理注意事项](kds_using-shards-and-metrics.md#kds_using-shards-and-metrics.shardmanagment)，然后设置 Kinesis 数据流的分片数。

1. 使用以下代码段在DynamoDB 表上启用 Kinesis 数据流。（可选）启用在每条记录上返回更精细（微秒）精度时间戳值的流式传输。

   启用微秒时间戳精度的流式传输：

   ```
   EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder()
     .approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND)
     .build();
   
   EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
     .tableName(tableName)
     .streamArn(kdsArn)
     .enableKinesisStreamingConfiguration(enableKdsConfig)
     .build();
   
   EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
   ```

   或者启用默认时间戳精度（毫秒）的流式传输：

   ```
   EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
     .tableName(tableName)
     .streamArn(kdsArn)
     .build();
   
   EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
   ```

1. 按照 *Kinesis Data Streams 开发人员指南*中的说明进行操作，从创建的数据流[读取](https://docs.aws.amazon.com/streams/latest/dev/building-consumers.html)。

1. 然后使用以下代码段来反序列化流内容。

   ```
   /**
    * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
    */
   public void processRecord(Record kinesisRecord) throws IOException {
       ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
       JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
       JsonNode dynamoDBRecord = rootNode.get("dynamodb");
       JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
       JsonNode newItemImage = dynamoDBRecord.get("NewImage");
       Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
   
       /**
        * Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value
        * of this attribute from the new item image, the below code would fetch this.
        */
       JsonNode attributeNode = newItemImage.get("stringName");
       JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
       String attributeValue = attributeValueNode.textValue();
       System.out.println(attributeValue);
   }
   
   private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
       JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
       JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
       if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
           return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
       }
       return Instant.ofEpochMilli(timestampJson.longValue());
   }
   ```

------

## 更改活动的 Amazon Kinesis Data Streams
<a name="kds_gettingstarted.making-changes"></a>

本节介绍了如何使用控制台、AWS CLI 和 API 更改活动的 Kinesis Data Streams for DynamoDB 设置。

**AWS 管理控制台**

1. 打开 DynamoDB 控制台：[https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)。

1. 转至您的表。

1. 选择**导出和流**。

**AWS CLI**

1. 调用 `describe-kinesis-streaming-destination` 以确认流的状态是 `ACTIVE`。

1. 调用 `UpdateKinesisStreamingDestination`，如以下示例所示：

   ```
   aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
   ```

1. 调用 `describe-kinesis-streaming-destination` 以确认流的状态是 `UPDATING`。

1. 定期调用 `describe-kinesis-streaming-destination`，直到流式传输状态恢复 `ACTIVE`。时间戳精度更新通常最多需要 5 分钟的时间才能生效。此状态更新后，即表示更新已完成，新的精度值将应用于未来的记录。

1. 使用 `putItem` 写入表。

1. 使用 Kinesis `get-records` 命令检索流内容。

1. 确认写入的 `ApproximateCreationDateTime` 具有所需的精度。

**Java API**

1. 提供构造 `UpdateKinesisStreamingDestination` 请求和 `UpdateKinesisStreamingDestination` 响应的代码段。

1. 提供构造 `DescribeKinesisStreamingDestination` 请求和 `DescribeKinesisStreamingDestination response` 的代码段。

1. 定期调用 `describe-kinesis-streaming-destination`，直到流状态恢复 `ACTIVE`，这表明更新已完成，新的精度值将应用于未来的记录。

1. 向表执行写入操作。

1.  从流中读取并反序列化流内容。

1. 确认写入的 `ApproximateCreationDateTime` 具有所需的精度。