DynamoDB Streams 和存留時間 - Amazon DynamoDB

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

DynamoDB Streams 和存留時間

您可以備份或處理 存留時間 (TTL) 刪除的項目,方法是在資料表上啟用 Amazon DynamoDB Streams 並處理過期項目的串流紀錄。如需詳細資訊,請參閱 讀取及處理串流

串流紀錄包含使用者身分欄位 Records[<index>].userIdentity

存留時間程序在過期後刪除的項目具有下列欄位:

  • Records[<index>].userIdentity.type

    "Service"

  • Records[<index>].userIdentity.principalId

    "dynamodb.amazonaws.com"

注意

當您在全域表中使用 TTL 時,執行 TTL 的區域將會設定userIdentity欄位。複寫刪除作業時,不會在其他區域設定此欄位。

下列 JSON 顯示單一串流紀錄的相關部分。

"Records": [ { ... "userIdentity": { "type": "Service", "principalId": "dynamodb.amazonaws.com" } ... } ]

使用 DynamoDB Streams 和 Lambda 封存 TTL 已刪除的項目

合併 DynamoDB 存留時間 (TTL)DynamoDB StreamsAWS Lambda 可協助簡化封存資料、降低 DynamoDB 儲存成本並降低程式碼複雜性。使用 Lambda 做為串流使用者具有許多優勢,最顯著的是與其他使用者 (例如 Kinesis Client Library (KCL)) 相比,成本降低。透過 Lambda 使用事件時,您不會因為 DynamoDB 串流上的 GetRecords API 呼叫而需要付費,而且 Lambda 可藉由識別串流事件中的 JSON 模式提供事件篩選功能。透過事件模式內容篩選,您最多可以定義五個不同的篩選條件來控制哪些事件要傳送到 Lambda 進行處理。這有助於減少對 Lambda 函數的叫用、簡化程式碼並降低總體成本。

雖然 DynamoDB Streams 包含所有資料修改,例如 CreateModifyRemove 動作,這可能會導致不必要的叫用封存 Lambda 函數。例如,假設有一個資料表每小時有 200 萬個資料修改流入串流中,但其中不到 5% 的項目刪除將在 TTL 程序中過期且需要進行封存。搭配 Lambda 事件來源篩選條件,Lambda 函數每小時只會叫用 100,000 次。使用事件篩選,您只需為所需的叫用付費,而不是在沒有事件篩選的情況下為 200 萬次的叫用付費。

事件篩選套用至 Lambda 事件來源映射,它是一種資源,可從選定的事件 (DynamoDB 串流) 中讀取並叫用 Lambda 函數。在下圖中,您可以看到 Lambda 函數如何透過串流和事件篩選條件使用存留時間已刪除的項目。

透過 TTL 程序刪除的項目會啟動使用串流和事件篩選器的 Lambda 函數。

DynamoDB 存留時間事件篩選條件模式

將以下 JSON 新增至事件來源映射篩選條件,可僅對 TTL 刪除的項目叫用 Lambda 函數:

{ "Filters": [ { "Pattern": { "userIdentity": { "type": ["Service"], "principalId": ["dynamodb.amazonaws.com"] } } } ] }

建立 AWS Lambda 事件來源對應

使用以下程式碼片段建立已篩選的事件來源映射,您可以將它連接到資料表的 DynamoDB 串流。每個程式碼區塊都包含事件篩選條件模式。

AWS CLI
aws lambda create-event-source-mapping \ --event-source-arn 'arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000' \ --batch-size 10 \ --enabled \ --function-name test_func \ --starting-position LATEST \ --filter-criteria '{"Filters": [{"Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"}]}'
Java
LambdaClient client = LambdaClient.builder() .region(Region.EU_WEST_1) .build(); Filter userIdentity = Filter.builder() .pattern("{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}") .build(); FilterCriteria filterCriteria = FilterCriteria.builder() .filters(userIdentity) .build(); CreateEventSourceMappingRequest mappingRequest = CreateEventSourceMappingRequest.builder() .eventSourceArn("arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000") .batchSize(10) .enabled(Boolean.TRUE) .functionName("test_func") .startingPosition("LATEST") .filterCriteria(filterCriteria) .build(); try{ CreateEventSourceMappingResponse eventSourceMappingResponse = client.createEventSourceMapping(mappingRequest); System.out.println("The mapping ARN is "+eventSourceMappingResponse.eventSourceArn()); }catch (ServiceException e){ System.out.println(e.getMessage()); }
Node
const client = new LambdaClient({ region: "eu-west-1" }); const input = { EventSourceArn: "arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000", BatchSize: 10, Enabled: true, FunctionName: "test_func", StartingPosition: "LATEST", FilterCriteria: { "Filters": [{ "Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}" }] } } const command = new CreateEventSourceMappingCommand(input); try { const results = await client.send(command); console.log(results); } catch (err) { console.error(err); }
Python
session = boto3.session.Session(region_name = 'eu-west-1') client = session.client('lambda') try: response = client.create_event_source_mapping( EventSourceArn='arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000', BatchSize=10, Enabled=True, FunctionName='test_func', StartingPosition='LATEST', FilterCriteria={ 'Filters': [ { 'Pattern': "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}" }, ] } ) print(response) except Exception as e: print(e)
JSON
{ "userIdentity": { "type": ["Service"], "principalId": ["dynamodb.amazonaws.com"] } }