适用于 Amazon DynamoDB 的 Kinesis Data Streams 入门
本节介绍了如何通过 Amazon DynamoDB 控制台、AWS Command Line Interface(AWS CLI)和 API 将 Kinesis Data Streams 用于 Amazon DynamoDB 表。
创建有效的 Amazon Kinesis 数据流
所有这些示例都使用 DynamoDB 入门教程中创建的 Music
DynamoDB 表。
要了解如何构建使用器并将 Kinesis Data Stream 连接到其他 AWS 服务,请参阅《Amazon Kinesis Data Streams 开发人员指南》中的从 Kinesis Data Streams 读取数据。
当您首次使用 KDS 分片时,建议您将分片设置为根据使用模式横向和纵向扩展。在积累了更多有关使用模式的数据后,您可以调整数据流中的分片以与模式匹配。
- Console
-
- AWS CLI
-
-
使用 create-stream 命令创建名为 samplestream
的 Kinesis 流。
aws kinesis create-stream --stream-name samplestream --shard-count 3
请参阅Kinesis Data Streams 的分片管理注意事项,然后设置 Kinesis 数据流的分片数。
-
使用 describe-stream 命令,检查 Kinesis 流是否处于活动状态并准备好使用。
aws kinesis describe-stream --stream-name samplestream
-
使用 DynamoDB enable-kinesis-streaming-destination
命令,在 DynamoDB 表上启用 Kinesis 数据流。将 stream-arn
值替换为上一步返回的 describe-stream
。(可选)启用在每条记录上返回更精细(微秒)精度时间戳值的流式传输。
启用微秒时间戳精度的流式传输:
aws dynamodb enable-kinesis-streaming-destination \
--table-name Music \
--stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
--enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
或者启用默认时间戳精度(毫秒)的流式传输:
aws dynamodb enable-kinesis-streaming-destination \
--table-name Music \
--stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
-
使用 DynamoDB describe-kinesis-streaming-destination
命令检查是否在表上激活 Kinesis 流。
aws dynamodb describe-kinesis-streaming-destination --table-name Music
-
使用 DynamoDB 开发人员指南介绍的 put-item
命令,将数据写入 DynamoDB 表。
aws dynamodb put-item \
--table-name Music \
--item \
'{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}'
aws dynamodb put-item \
--table-name Music \
--item \
'{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
-
使用 Kinesis get-records CLI 命令检索 Kinesis 流内容。然后使用以下代码片段来反序列化流内容。
/**
* Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
*/
public void processRecord(Record kinesisRecord) throws IOException {
ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
JsonNode dynamoDBRecord = rootNode.get("dynamodb");
JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
JsonNode newItemImage = dynamoDBRecord.get("NewImage");
Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
/**
* Say for example our record contains a String attribute named "stringName" and we want to fetch the value
* of this attribute from the new item image. The following code fetches this value.
*/
JsonNode attributeNode = newItemImage.get("stringName");
JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
String attributeValue = attributeValueNode.textValue();
System.out.println(attributeValue);
}
private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
}
return Instant.ofEpochMilli(timestampJson.longValue());
}
- Java
-
-
按照 Kinesis Data Streams 开发人员指南中的说明,使用 Java 创建一个名为 samplestream
的https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-create-stream.html Kinesis 数据流。
请参阅Kinesis Data Streams 的分片管理注意事项,然后设置 Kinesis 数据流的分片数。
-
使用以下代码段在DynamoDB 表上启用 Kinesis 数据流。(可选)启用在每条记录上返回更精细(微秒)精度时间戳值的流式传输。
启用微秒时间戳精度的流式传输:
EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder()
.approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND)
.build();
EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
.tableName(tableName)
.streamArn(kdsArn)
.enableKinesisStreamingConfiguration(enableKdsConfig)
.build();
EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
或者启用默认时间戳精度(毫秒)的流式传输:
EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
.tableName(tableName)
.streamArn(kdsArn)
.build();
EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
-
按照 Kinesis Data Streams 开发人员指南中的说明进行操作,从创建的数据流读取。
-
然后使用以下代码段来反序列化流内容。
/**
* Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
*/
public void processRecord(Record kinesisRecord) throws IOException {
ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
JsonNode dynamoDBRecord = rootNode.get("dynamodb");
JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
JsonNode newItemImage = dynamoDBRecord.get("NewImage");
Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
/**
* Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value
* of this attribute from the new item image, the below code would fetch this.
*/
JsonNode attributeNode = newItemImage.get("stringName");
JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
String attributeValue = attributeValueNode.textValue();
System.out.println(attributeValue);
}
private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
}
return Instant.ofEpochMilli(timestampJson.longValue());
}
更改活动的 Amazon Kinesis Data Streams
本节介绍了如何使用控制台、AWS CLI 和 API 更改活动的 Kinesis Data Streams for DynamoDB 设置。
AWS Management Console
AWS CLI
-
调用 describe-kinesis-streaming-destination
以确认流的状态是 ACTIVE
。
-
调用 UpdateKinesisStreamingDestination
,如以下示例所示:
aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
-
调用 describe-kinesis-streaming-destination
以确认流的状态是 UPDATING
。
-
定期调用 describe-kinesis-streaming-destination
,直到流式传输状态恢复 ACTIVE
。时间戳精度更新通常最多需要 5 分钟的时间才能生效。此状态更新后,即表示更新已完成,新的精度值将应用于未来的记录。
-
使用 putItem
写入表。
-
使用 Kinesis get-records
命令检索流内容。
-
确认写入的 ApproximateCreationDateTime
具有所需的精度。
Java API
-
提供构造 UpdateKinesisStreamingDestination
请求和 UpdateKinesisStreamingDestination
响应的代码段。
-
提供构造 DescribeKinesisStreamingDestination
请求和 DescribeKinesisStreamingDestination response
的代码段。
-
定期调用 describe-kinesis-streaming-destination
,直到流状态恢复 ACTIVE
,这表明更新已完成,新的精度值将应用于未来的记录。
-
向表执行写入操作。
-
从流中读取并反序列化流内容。
-
确认写入的 ApproximateCreationDateTime
具有所需的精度。