Amazon DynamoDB 專用 Kinesis Data Streams 入門 - Amazon DynamoDB

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Amazon DynamoDB 專用 Kinesis Data Streams 入門

本節說明如何將 Amazon DynamoDB 資料表的 Kinesis Data Streams 與 Amazon DynamoDB 主控台、 AWS Command Line Interface (AWS CLI) 和 搭配使用API。

建立作用中的 Amazon Kinesis 資料串流

所有這些範例都使用 Music DynamoDB 資料表,該資料表是在 DynamoDB 入門教學課程中建立的。

若要進一步了解如何建置消費者並將 Kinesis 資料串流連線至其他 AWS 服務,請參閱《Amazon Kinesis Data Streams 開發人員指南》中的從 Kinesis Data Streams 讀取資料

注意

當您第一次使用KDS碎片時,建議您將碎片設定為使用模式來擴展和縮減。累積更多使用模式的相關資料後,您可以調整串流中的碎片以進行配對。

Console
  1. 登入 AWS Management Console 並在 開啟 Kinesis 主控台https://console.aws.amazon.com/kinesis/

  2. 選擇 Create data stream (建立資料串流),並依照說明來建立名為 samplestream 的串流。

  3. 在 開啟 DynamoDB 主控台https://console.aws.amazon.com/dynamodb/

  4. 在主控台左側的導覽窗格中,選擇 Tables (資料表)。

  5. 選擇 Music (音樂) 資料表。

  6. 選擇 Exports and streams (匯出與串流) 索引標籤。

  7. (選用) 在 Amazon Kinesis 資料串流詳細資訊 下,您可以將記錄時間戳記精確度從微秒 (預設) 變更為毫秒。

  8. 從下拉式清單選擇 samplestream (範例串流)。

  9. 選擇開啟按鈕。

AWS CLI
  1. 使用 create-stream 命令建立名為 samplestream 的 Kinesis 資料串流。

    aws kinesis create-stream --stream-name samplestream --shard-count 3

    請先參閱 Kinesis Data Streams 的碎片管理考量事項,再為 Kinesis 資料串流設定碎片數量。

  2. 使用 describe-stream 命令確認 Kinesis 串流是否處於作用中狀態且可供使用。

    aws kinesis describe-stream --stream-name samplestream
  3. 使用 DynamoDB enable-kinesis-streaming-destination 命令在 DynamoDB 資料表上啟用 Kinesis 串流功能。將 stream-arn 值替換為上一個步驟中傳回的值 describe-stream。或者,啟用每個記錄上傳回的時間戳記值更精細 (微秒) 精度的串流。

    啟用微秒時間戳記精確度的串流:

    aws dynamodb enable-kinesis-streaming-destination \ --table-name Music \ --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream --enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND

    或啟用預設時間戳記精確度的串流 (毫秒):

    aws dynamodb enable-kinesis-streaming-destination \ --table-name Music \ --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
  4. 使用 DynamoDB describe-kinesis-streaming-destination 命令確認資料表上的 Kinesis 串流是否處於作用中狀態。

    aws dynamodb describe-kinesis-streaming-destination --table-name Music
  5. 使用 put-item 命令將資料寫入 DynamoDB 資料表,如《DynamoDB 開發人員指南》中所述。

    aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}' aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
  6. 使用 Kinesis get-records CLI命令來擷取 Kinesis 串流內容。然後使用下面的程式碼片段來將串流內容還原序列化。

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we want to fetch the value * of this attribute from the new item image. The following code fetches this value. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision"); if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) { return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS); } return Instant.ofEpochMilli(timestampJson.longValue()); }
Java
  1. 遵循《Kinesis Data Streams 開發人員指南》中的說明,使用 Java 建立名為 samplestream 的 Kinesis 資料串流。

    請先參閱 Kinesis Data Streams 的碎片管理考量事項,再為 Kinesis 資料串流設定碎片數量。

  2. 使用以下程式碼片段來啟用 DynamoDB 資料表上的 Kinesis 串流。或者,啟用每個記錄上傳回的時間戳記值更精細 (微秒) 精度的串流。

    啟用微秒時間戳記精確度的串流:

    EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder() .approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND) .build(); EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder() .tableName(tableName) .streamArn(kdsArn) .enableKinesisStreamingConfiguration(enableKdsConfig) .build(); EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);

    或啟用預設時間戳記精確度的串流 (毫秒):

    EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder() .tableName(tableName) .streamArn(kdsArn) .build(); EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
  3. 遵循《Kinesis Data Streams 開發人員指南》中的說明,從建立的資料串流中進行讀取

  4. 然後使用下面的程式碼片段將串流內容還原序列化

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value * of this attribute from the new item image, the below code would fetch this. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision"); if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) { return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS); } return Instant.ofEpochMilli(timestampJson.longValue()); }

變更作用中的 Amazon Kinesis 資料串流

本節說明如何使用 主控台 AWS CLI 和 對作用中的 Kinesis Data Streams for DynamoDB 設定進行變更API。

AWS Management Console

  1. 在 開啟 DynamoDB 主控台 https://console.aws.amazon.com/dynamodb/

  2. 前往您的資料表。

  3. 選擇 匯出和串流

AWS CLI

  1. 呼叫 describe-kinesis-streaming-destination 以確認串流為 ACTIVE

  2. 呼叫 UpdateKinesisStreamingDestination,例如在此範例中:

    aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
  3. 呼叫 describe-kinesis-streaming-destination 以確認串流為 UPDATING

  4. describe-kinesis-streaming-destination 定期呼叫,直到串流狀態ACTIVE再次出現為止。時間戳記精確度更新通常需要 5 分鐘才會生效。一旦此狀態更新,表示更新已完成,且新的精確度值將套用至未來的記錄。

  5. 使用 寫入資料表putItem

  6. 使用 Kinesis get-records命令來取得串流內容。

  7. 確認寫入ApproximateCreationDateTime的 具有所需的精確度。

Java API

  1. 提供程式碼片段來建構UpdateKinesisStreamingDestination請求和UpdateKinesisStreamingDestination回應。

  2. 提供程式碼片段來建構DescribeKinesisStreamingDestination請求和 DescribeKinesisStreamingDestination response

  3. describe-kinesis-streaming-destination 定期呼叫,直到串流狀態ACTIVE再次出現,表示更新已完成,且新的精確度值將套用至未來的記錄。

  4. 執行對資料表的寫入。

  5. 從串流讀取並取消序列化串流內容。

  6. 確認寫入ApproximateCreationDateTime的 具有所需的精確度。