本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
Amazon DynamoDB 專用 Kinesis Data Streams 入門
本節說明如何將 Amazon DynamoDB 資料表的 Kinesis Data Streams 與 Amazon DynamoDB 主控台、 AWS Command Line Interface (AWS CLI) 和 搭配使用API。
建立作用中的 Amazon Kinesis 資料串流
所有這些範例都使用 Music
DynamoDB 資料表,該資料表是在 DynamoDB 入門教學課程中建立的。
若要進一步了解如何建置消費者並將 Kinesis 資料串流連線至其他 AWS 服務,請參閱《Amazon Kinesis Data Streams 開發人員指南》中的從 Kinesis Data Streams 讀取資料。
當您第一次使用KDS碎片時,建議您將碎片設定為使用模式來擴展和縮減。累積更多使用模式的相關資料後,您可以調整串流中的碎片以進行配對。
- Console
-
- AWS CLI
-
-
使用 create-stream 命令建立名為 samplestream
的 Kinesis 資料串流。
aws kinesis create-stream --stream-name samplestream --shard-count 3
請先參閱 Kinesis Data Streams 的碎片管理考量事項,再為 Kinesis 資料串流設定碎片數量。
-
使用 describe-stream 命令確認 Kinesis 串流是否處於作用中狀態且可供使用。
aws kinesis describe-stream --stream-name samplestream
-
使用 DynamoDB enable-kinesis-streaming-destination
命令在 DynamoDB 資料表上啟用 Kinesis 串流功能。將 stream-arn
值替換為上一個步驟中傳回的值 describe-stream
。或者,啟用每個記錄上傳回的時間戳記值更精細 (微秒) 精度的串流。
啟用微秒時間戳記精確度的串流:
aws dynamodb enable-kinesis-streaming-destination \
--table-name Music \
--stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
--enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
或啟用預設時間戳記精確度的串流 (毫秒):
aws dynamodb enable-kinesis-streaming-destination \
--table-name Music \
--stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
-
使用 DynamoDB describe-kinesis-streaming-destination
命令確認資料表上的 Kinesis 串流是否處於作用中狀態。
aws dynamodb describe-kinesis-streaming-destination --table-name Music
-
使用 put-item
命令將資料寫入 DynamoDB 資料表,如《DynamoDB 開發人員指南》中所述。
aws dynamodb put-item \
--table-name Music \
--item \
'{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}'
aws dynamodb put-item \
--table-name Music \
--item \
'{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
-
使用 Kinesis get-records CLI命令來擷取 Kinesis 串流內容。然後使用下面的程式碼片段來將串流內容還原序列化。
/**
* Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
*/
public void processRecord(Record kinesisRecord) throws IOException {
ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
JsonNode dynamoDBRecord = rootNode.get("dynamodb");
JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
JsonNode newItemImage = dynamoDBRecord.get("NewImage");
Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
/**
* Say for example our record contains a String attribute named "stringName" and we want to fetch the value
* of this attribute from the new item image. The following code fetches this value.
*/
JsonNode attributeNode = newItemImage.get("stringName");
JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
String attributeValue = attributeValueNode.textValue();
System.out.println(attributeValue);
}
private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
}
return Instant.ofEpochMilli(timestampJson.longValue());
}
- Java
-
-
遵循《Kinesis Data Streams 開發人員指南》中的說明,使用 Java 建立名為 samplestream
的 Kinesis 資料串流。
請先參閱 Kinesis Data Streams 的碎片管理考量事項,再為 Kinesis 資料串流設定碎片數量。
-
使用以下程式碼片段來啟用 DynamoDB 資料表上的 Kinesis 串流。或者,啟用每個記錄上傳回的時間戳記值更精細 (微秒) 精度的串流。
啟用微秒時間戳記精確度的串流:
EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder()
.approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND)
.build();
EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
.tableName(tableName)
.streamArn(kdsArn)
.enableKinesisStreamingConfiguration(enableKdsConfig)
.build();
EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
或啟用預設時間戳記精確度的串流 (毫秒):
EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
.tableName(tableName)
.streamArn(kdsArn)
.build();
EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
-
遵循《Kinesis Data Streams 開發人員指南》中的說明,從建立的資料串流中進行讀取。
-
然後使用下面的程式碼片段將串流內容還原序列化
/**
* Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
*/
public void processRecord(Record kinesisRecord) throws IOException {
ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
JsonNode dynamoDBRecord = rootNode.get("dynamodb");
JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
JsonNode newItemImage = dynamoDBRecord.get("NewImage");
Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
/**
* Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value
* of this attribute from the new item image, the below code would fetch this.
*/
JsonNode attributeNode = newItemImage.get("stringName");
JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
String attributeValue = attributeValueNode.textValue();
System.out.println(attributeValue);
}
private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
}
return Instant.ofEpochMilli(timestampJson.longValue());
}
變更作用中的 Amazon Kinesis 資料串流
本節說明如何使用 主控台 AWS CLI 和 對作用中的 Kinesis Data Streams for DynamoDB 設定進行變更API。
AWS Management Console
AWS CLI
-
呼叫 describe-kinesis-streaming-destination
以確認串流為 ACTIVE
。
-
呼叫 UpdateKinesisStreamingDestination
,例如在此範例中:
aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
-
呼叫 describe-kinesis-streaming-destination
以確認串流為 UPDATING
。
-
describe-kinesis-streaming-destination
定期呼叫,直到串流狀態ACTIVE
再次出現為止。時間戳記精確度更新通常需要 5 分鐘才會生效。一旦此狀態更新,表示更新已完成,且新的精確度值將套用至未來的記錄。
-
使用 寫入資料表putItem
。
-
使用 Kinesis get-records
命令來取得串流內容。
-
確認寫入ApproximateCreationDateTime
的 具有所需的精確度。
Java API
-
提供程式碼片段來建構UpdateKinesisStreamingDestination
請求和UpdateKinesisStreamingDestination
回應。
-
提供程式碼片段來建構DescribeKinesisStreamingDestination
請求和 DescribeKinesisStreamingDestination response
。
-
describe-kinesis-streaming-destination
定期呼叫,直到串流狀態ACTIVE
再次出現,表示更新已完成,且新的精確度值將套用至未來的記錄。
-
執行對資料表的寫入。
-
從串流讀取並取消序列化串流內容。
-
確認寫入ApproximateCreationDateTime
的 具有所需的精確度。