本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
Amazon DynamoDB 串流做為管道的來源 EventBridge
您可以使用 EventBridge 管道來接收 DynamoDB 串流中的記錄。然後,您可以選擇性地篩選或增強這些記錄,然後再將它們傳送到目標進行處理。您可以在設定管道時選擇 Amazon DynamoDB 串流的特定設定。 EventBridge 當將資料傳送至目的地時,管道會維護資料流中的記錄順序。
重要
停用做為管道來源的 DynamoDB 串流會導致該管道變得無法使用,即使您之後重新啟用串流也是如此。發生這種情況的原因是:
您無法停止、啟動或更新其來源已停用的管道。
建立後,您無法使用新來源更新管道。當您重新啟用 DynamoDB 串流時,該串流會指派一個新的 Amazon 資源名稱 (ARN),且不再與管道相關聯。
如果您確實重新啟用 DynamoDB 串流,則需要使用串流的新管道建立新管道。ARN
範例事件
下列範例事件顯示管道接收的資訊。您可以使用此事件來建立和篩選事件模式,或定義輸入轉換。並非所有欄位都可以篩選。如需有關所能篩選欄位的詳細資訊,請參閱 Amazon EventBridge 管道中的事件過濾。
[ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": "arn:aws:dynamodb:us-east-1:111122223333:table/EventSourceTable", "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": "arn:aws:dynamodb:us-east-1:111122223333:table/EventSourceTable", "eventSource": "aws:dynamodb" } ]
輪詢和批次處理串流
EventBridge 輪詢會以每秒四次的基本速率在 DynamoDB 串流中進行記錄碎片。當記錄可用時, EventBridge 處理事件並等待結果。如果處理成功,會 EventBridge 繼續輪詢,直到收到更多記錄為止。
根據預設,只要有記錄可用,就會 EventBridge 叫用管道。如果從來源 EventBridge 讀取的批次中只有一筆記錄,則只會處理一個事件。為避免處理少量記錄,您可讓管道設定批次間隔,要求管道緩衝記錄最長達五分鐘。在處理事件之前,會 EventBridge 繼續從來源讀取記錄,直到收集完整批次、批次處理期間到期或批次達到 6 MB 的有效負載限制為止。
您還可以透過並行處理來自每個碎片的多個批次來增加並行性。 EventBridge 可以同時在每個分片中處理多達 10 個批次。如果增加每個碎片的並行批次數, EventBridge 仍然可以確保在分區索引鍵層級進行按順序處理。
規劃 ParallelizationFactor
設定來同時處理 Kinesis 或 DynamoDB 資料串流的一個碎片與多個管道執行。您可以透過平行化因子從 1 (預設值) 到 10,指定從碎片 EventBridge 輪詢的並行批次數。例如,當您設定ParallelizationFactor
為 2 時,最多可以有 200 個並行 EventBridge 管道執行,以處理 100 個 Kinesis 資料碎片。當資料量急劇波動並且 IteratorAge
高時,這有助於縱向擴展處理輸送量。請注意,如果您使用 Kinesis 彙總,則並行化因子將不起作用。
輪詢和串流開始位置
請注意,建立和更新管道期間的串流輪詢最終會一致。
在建立管道期間,從串流開始輪詢事件可能需要幾分鐘時間。
在更新管道資源輪訓組態期間,從串流停止並重新開始輪詢事件可能需要幾分鐘時間。
這表示如果您指定 LATEST
當作串流的開始位置,管道可能會在建立或更新期間遺漏事件。若要確保沒有遺漏任何事件,請將串流開始位置指定為 TRIM_HORIZON
。
報告批次項目失敗
當使 EventBridge 用和處理來源的串流資料時,依預設,它會檢查點為批次的最高序號,但只有在批次完全成功時才會檢查點。若要避免重新處理失敗批次中成功處理過的訊息,您可以設定擴充或目標,傳回哪些訊息成功,哪些失敗的物件。我們將其稱為部分批次回應。
如需詳細資訊,請參閱 部分批次失敗。
成功與失敗條件
如果您傳回下列任一項目,請 EventBridge 將批次視為完全成功:
空白
batchItemFailure
清單Null
batchItemFailure
清單空白
EventResponse
Null
EventResponse
如果您傳回下列任一項目,則 EventBridge 會將批次視為完全失敗:
空白字串
itemIdentifier
Null
itemIdentifier
具有錯誤金鑰名稱的
itemIdentifier
EventBridge 根據您的重試策略重試失敗。