選取您的 Cookie 偏好設定

我們使用提供自身網站和服務所需的基本 Cookie 和類似工具。我們使用效能 Cookie 收集匿名統計資料,以便了解客戶如何使用我們的網站並進行改進。基本 Cookie 無法停用,但可以按一下「自訂」或「拒絕」以拒絕效能 Cookie。

如果您同意,AWS 與經核准的第三方也會使用 Cookie 提供實用的網站功能、記住您的偏好設定,並顯示相關內容,包括相關廣告。若要接受或拒絕所有非必要 Cookie,請按一下「接受」或「拒絕」。若要進行更詳細的選擇,請按一下「自訂」。

Amazon DynamoDB 專用 Kinesis Data Streams 入門

焦點模式
Amazon DynamoDB 專用 Kinesis Data Streams 入門 - Amazon DynamoDB

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

本節說明如何將 Amazon DynamoDB 資料表的 Kinesis Data Streams 與 Amazon DynamoDB 主控台、 AWS Command Line Interface (AWS CLI) 和 搭配使用API。

建立作用中的 Amazon Kinesis 資料串流

所有這些範例都使用 Music DynamoDB 資料表,該資料表是在 DynamoDB 入門教學課程中建立的。

若要進一步了解如何建置消費者並將 Kinesis 資料串流連線至其他 AWS 服務,請參閱《Amazon Kinesis Data Streams 開發人員指南》中的從 Kinesis Data Streams 讀取資料

注意

當您第一次使用KDS碎片時,我們建議您設定碎片,以使用模式進行擴展和縮減。累積更多使用模式的相關資料後,您可以調整串流中的碎片以進行配對。

Console
  1. 登入 AWS Management Console ,並在 開啟 Kinesis 主控台https://console.aws.amazon.com/kinesis/

  2. 選擇 Create data stream (建立資料串流),並依照說明來建立名為 samplestream 的串流。

  3. 在 開啟 DynamoDB 主控台https://console.aws.amazon.com/dynamodb/

  4. 在主控台左側的導覽窗格中,選擇 Tables (資料表)。

  5. 選擇 Music (音樂) 資料表。

  6. 選擇 Exports and streams (匯出與串流) 索引標籤。

  7. (選用) 在 Amazon Kinesis 資料串流詳細資訊下,您可以將記錄時間戳記精確度從微秒 (預設) 變更為毫秒。

  8. 從下拉式清單選擇 samplestream (範例串流)。

  9. 選擇開啟按鈕。

AWS CLI
  1. 使用 create-stream 命令建立名為 samplestream 的 Kinesis 資料串流。

    aws kinesis create-stream --stream-name samplestream --shard-count 3

    請先參閱 Kinesis Data Streams 的碎片管理考量事項,再為 Kinesis 資料串流設定碎片數量。

  2. 使用 describe-stream 命令確認 Kinesis 串流是否處於作用中狀態且可供使用。

    aws kinesis describe-stream --stream-name samplestream
  3. 使用 DynamoDB enable-kinesis-streaming-destination 命令在 DynamoDB 資料表上啟用 Kinesis 串流功能。將 stream-arn 值替換為上一個步驟中傳回的值 describe-stream。或者,啟用每個記錄上傳回的時間戳記值更精細 (微秒) 精度的串流。

    啟用微秒時間戳記精確度的串流:

    aws dynamodb enable-kinesis-streaming-destination \ --table-name Music \ --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream --enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND

    或啟用預設時間戳記精確度的串流 (毫秒):

    aws dynamodb enable-kinesis-streaming-destination \ --table-name Music \ --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
  4. 使用 DynamoDB describe-kinesis-streaming-destination 命令確認資料表上的 Kinesis 串流是否處於作用中狀態。

    aws dynamodb describe-kinesis-streaming-destination --table-name Music
  5. 使用 put-item 命令將資料寫入 DynamoDB 資料表,如《DynamoDB 開發人員指南》中所述。

    aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}' aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
  6. 使用 Kinesis get-records CLI命令來擷取 Kinesis 串流內容。然後使用下面的程式碼片段來將串流內容還原序列化。

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we want to fetch the value * of this attribute from the new item image. The following code fetches this value. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision"); if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) { return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS); } return Instant.ofEpochMilli(timestampJson.longValue()); }
Java
  1. 遵循《Kinesis Data Streams 開發人員指南》中的說明,使用 Java 建立名為 samplestream 的 Kinesis 資料串流。

    請先參閱 Kinesis Data Streams 的碎片管理考量事項,再為 Kinesis 資料串流設定碎片數量。

  2. 使用以下程式碼片段來啟用 DynamoDB 資料表上的 Kinesis 串流。或者,啟用每個記錄上傳回的時間戳記值更精細 (微秒) 精度的串流。

    啟用微秒時間戳記精確度的串流:

    EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder() .approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND) .build(); EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder() .tableName(tableName) .streamArn(kdsArn) .enableKinesisStreamingConfiguration(enableKdsConfig) .build(); EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);

    或啟用預設時間戳記精確度的串流 (毫秒):

    EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder() .tableName(tableName) .streamArn(kdsArn) .build(); EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
  3. 遵循《Kinesis Data Streams 開發人員指南》中的說明,從建立的資料串流中進行讀取

  4. 然後使用下面的程式碼片段將串流內容還原序列化

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value * of this attribute from the new item image, the below code would fetch this. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision"); if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) { return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS); } return Instant.ofEpochMilli(timestampJson.longValue()); }
  1. 登入 AWS Management Console ,並在 開啟 Kinesis 主控台https://console.aws.amazon.com/kinesis/

  2. 選擇 Create data stream (建立資料串流),並依照說明來建立名為 samplestream 的串流。

  3. 在 開啟 DynamoDB 主控台https://console.aws.amazon.com/dynamodb/

  4. 在主控台左側的導覽窗格中,選擇 Tables (資料表)。

  5. 選擇 Music (音樂) 資料表。

  6. 選擇 Exports and streams (匯出與串流) 索引標籤。

  7. (選用) 在 Amazon Kinesis 資料串流詳細資訊下,您可以將記錄時間戳記精確度從微秒 (預設) 變更為毫秒。

  8. 從下拉式清單選擇 samplestream (範例串流)。

  9. 選擇開啟按鈕。

變更作用中的 Amazon Kinesis 資料串流

本節說明如何使用 主控台 AWS CLI 和 對作用中的 Kinesis Data Streams for DynamoDB 設定進行變更API。

AWS Management Console

  1. 在 開啟 DynamoDB 主控台 https://console.aws.amazon.com/dynamodb/

  2. 前往您的 資料表。

  3. 選擇匯出和串流

AWS CLI

  1. 呼叫 describe-kinesis-streaming-destination 以確認串流為 ACTIVE

  2. 呼叫 UpdateKinesisStreamingDestination,例如在此範例中:

    aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
  3. 呼叫 describe-kinesis-streaming-destination 以確認串流為 UPDATING

  4. describe-kinesis-streaming-destination 定期呼叫 ,直到串流狀態ACTIVE再次變成為止。時間戳記精確度更新通常需要 5 分鐘才會生效。一旦此狀態更新,表示更新已完成,且新的精確度值將套用至未來的記錄。

  5. 使用 寫入資料表putItem

  6. 使用 Kinesis get-records命令來取得串流內容。

  7. 確認寫入ApproximateCreationDateTime的 具有所需的精確度。

Java API

  1. 提供程式碼片段來建構UpdateKinesisStreamingDestination請求和UpdateKinesisStreamingDestination回應。

  2. 提供程式碼片段來建構DescribeKinesisStreamingDestination請求和 DescribeKinesisStreamingDestination response

  3. describe-kinesis-streaming-destination 定期呼叫 ,直到串流狀態ACTIVE再次出現,表示更新已完成,且新的精確度值將套用至未來的記錄。

  4. 執行對資料表的寫入。

  5. 從串流讀取並還原序列化串流內容。

  6. 確認寫入ApproximateCreationDateTime的 具有所需的精確度。

隱私權網站條款Cookie 偏好設定
© 2025, Amazon Web Services, Inc.或其附屬公司。保留所有權利。