DynamoDB Streams と有効期限 (TTL)
テーブルに対して Amazon DynamoDB Streams を有効にし、期限切れの項目のストリーミングレコードを処理することで、有効期限 (TTL) によって削除された項目をバックアップ (または処理) できます。詳細については、「ストリームの読み込みと処理」を参照してください。
ストリームレコードにはユーザー ID フィールド Records[<index>
].userIdentity
が含まれます。
有効期限切れの後に有効期限 (TTL) プロセスによって削除された項目には、次のフィールドが含まれています。
-
Records[<index>
].userIdentity.type
"Service"
-
Records[<index>
].userIdentity.principalId
"dynamodb.amazonaws.com"
TTL をグローバルテーブルで使用すると、TTL が実行されたリージョンに userIdentity
フィールドが設定されます。削除が複製されても、このフィールドは他のリージョンには設定されません。
次の JSON は 1 つのストリームレコードの関連する部分を示しています。
"Records": [
{
...
"userIdentity": {
"type": "Service",
"principalId": "dynamodb.amazonaws.com"
}
...
}
]
DynamoDB Streams と Lambda を使用して TTL 削除済みアイテムをアーカイブする
DynamoDB 有効期限 (TTL)、DynamoDB Streams および AWS Lambda を組み合わせると、データのアーカイブを簡素化し、DynamoDB ストレージコストを削減し、コードの複雑さを軽減するのに役立ちます。ストリームコンシューマーとして Lambda を使用すると、Kinesis Client Library (KCL) などの他のコンシューマーと比較してコストが削減されるなど、多くの利点があります。Lambda を使用してイベントを消費する場合、DynamoDB ストリームの GetRecords
API 呼び出しでは課金が発生しません。Lambda はストリームイベント内の JSON パターンを識別してイベントフィルタリングを提供できます。イベントパターンのコンテンツフィルタリングでは、最大 5 つの異なるフィルターを定義して、処理のために Lambda に送信されるイベントを制御できます。これにより、Lambda 関数の呼び出しを減らしてコードを簡素化し、全体的なコストを削減できます。
DynamoDB Streams には、Create
、Modify
およびRemove
アクションなどのすべてのデータ変更が含まれています。これは、アーカイブ Lambda 関数の不要な呼び出しを引き起こす可能性があります。例えば、1 時間あたり 200 万件のデータ変更がストリームに流れ込むテーブルがあり、そのうち 5% 未満が TTL プロセスによって期限切れになり、アーカイブする必要があるアイテム削除であるとします。Lambda イベントソースフィルターを使用すると、Lambda 関数は 1 時間あたり 100,000 回しか呼び出されません。イベントフィルタリングを使用した結果、イベントフィルタリングを行わなければ 200 万回の呼び出しが発生するところを、必要な呼び出しに対してのみ課金されることになります。
イベントフィルタリングは、Lambda イベントソースマッピングに適用されます。これは、選択されたイベント (DynamoDB ストリーム) から読み取り、Lambda 関数を呼び出すリソースです。次の図は、ストリームとイベントフィルターを使用して Lambda 関数によって有効期限 (TTL) 削除済みアイテムがどのように消費されるかを示しています。
DynamoDB の有効期限 (TTL) イベントフィルターパターン
イベントソースマッピングのフィルター条件に次の JSON を追加することで、TTL 削除済みアイテムに対してのみ Lambda 関数の呼び出しを許可します。
{
"Filters": [
{
"Pattern": { "userIdentity": { "type": ["Service"], "principalId": ["dynamodb.amazonaws.com"] } }
}
]
}
AWS Lambda イベントソースマッピングを作成します。
次のコードスニペットを使用して、テーブルの DynamoDB ストリームに接続できる、フィルター処理されたイベントソースマッピングを作成します。各コードブロックには、イベントフィルターパターンが含まれます。
- AWS CLI
-
aws lambda create-event-source-mapping \
--event-source-arn 'arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000' \
--batch-size 10 \
--enabled \
--function-name test_func \
--starting-position LATEST \
--filter-criteria '{"Filters": [{"Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"}]}'
- Java
-
LambdaClient client = LambdaClient.builder()
.region(Region.EU_WEST_1)
.build();
Filter userIdentity = Filter.builder()
.pattern("{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}")
.build();
FilterCriteria filterCriteria = FilterCriteria.builder()
.filters(userIdentity)
.build();
CreateEventSourceMappingRequest mappingRequest = CreateEventSourceMappingRequest.builder()
.eventSourceArn("arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000")
.batchSize(10)
.enabled(Boolean.TRUE)
.functionName("test_func")
.startingPosition("LATEST")
.filterCriteria(filterCriteria)
.build();
try{
CreateEventSourceMappingResponse eventSourceMappingResponse = client.createEventSourceMapping(mappingRequest);
System.out.println("The mapping ARN is "+eventSourceMappingResponse.eventSourceArn());
}catch (ServiceException e){
System.out.println(e.getMessage());
}
- Node
-
const client = new LambdaClient({ region: "eu-west-1" });
const input = {
EventSourceArn: "arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000",
BatchSize: 10,
Enabled: true,
FunctionName: "test_func",
StartingPosition: "LATEST",
FilterCriteria: { "Filters": [{ "Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}" }] }
}
const command = new CreateEventSourceMappingCommand(input);
try {
const results = await client.send(command);
console.log(results);
} catch (err) {
console.error(err);
}
- Python
-
session = boto3.session.Session(region_name = 'eu-west-1')
client = session.client('lambda')
try:
response = client.create_event_source_mapping(
EventSourceArn='arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000',
BatchSize=10,
Enabled=True,
FunctionName='test_func',
StartingPosition='LATEST',
FilterCriteria={
'Filters': [
{
'Pattern': "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"
},
]
}
)
print(response)
except Exception as e:
print(e)
- JSON
-
{
"userIdentity": {
"type": ["Service"],
"principalId": ["dynamodb.amazonaws.com"]
}
}