DynamoDB Streams y período de vida - Amazon DynamoDB

DynamoDB Streams y período de vida

Puede realizar copias de seguridad o bien procesar los elementos eliminados por período de vida (TTL, por sus siglas en inglés) habilitando Amazon DynamoDB Streams en la tabla y procesando los registros de transmisión de los elementos vencidos. Para obtener más información, consulte Lectura y procesamiento de un flujo.

El registro de secuencias contiene el campo de identidad del usuario Records[<index>].userIdentity.

Los elementos que el proceso período de vida elimina tras su vencimiento tienen los siguientes campos:

  • Records[<index>].userIdentity.type

    "Service"

  • Records[<index>].userIdentity.principalId

    "dynamodb.amazonaws.com"

nota

Cuando utilice el TTL en una tabla global, el campo userIdentity se configurará en la región en la que se realizó el TTL. Este campo no se establecerá en otras regiones cuando se replique la eliminación.

En el código JSON siguiente se muestra la parte pertinente de un registro de secuencias único.

"Records": [ { ... "userIdentity": { "type": "Service", "principalId": "dynamodb.amazonaws.com" } ... } ]

Uso de DynamoDB Streams y Lambda para archivar elementos de TTL eliminados

La combinación de DynamoDB Time to Live (TTL), DynamoDB Streams y AWS Lambda puede simplificar el archivo de datos, reducir los costos de almacenamiento de DynamoDB y reducir la complejidad del código. El uso de Lambda como consumidor de flujos proporciona muchas ventajas, entre las que destaca la reducción de costos en comparación con otros consumidores como Kinesis Client Library (KCL). No se le cobran las llamadas a la API GetRecords en su flujo de DynamoDB cuando utiliza Lambda para consumir eventos, y Lambda puede proporcionar un filtrado de eventos mediante la identificación de patrones JSON en un evento de flujo. Con el filtrado de contenido de patrones de eventos, puede definir hasta cinco filtros diferentes para controlar qué eventos se envían a Lambda para procesarlos. De este modo, se reducen las invocaciones de sus funciones Lambda, se simplifica el código y se reduce el costo total.

Aunque DynamoDB Streams contiene todas las modificaciones de datos, como las acciones Create, Modify y Remove, esto puede dar lugar a invocaciones no deseadas de su función Lambda de archivo. Por ejemplo, supongamos que tiene una tabla con dos millones de modificaciones de datos por hora que se incluyen en el flujo, pero menos del 5 por ciento de estas son eliminaciones de elementos que vencerán a través del proceso TTL y se deben archivar. Con los filtros de origen de eventos de Lambda, la función Lambda solo se invocará 100 000 veces por hora. El resultado con el filtrado de eventos es que solo se le cobran las invocaciones necesarias en lugar de los dos millones de invocaciones que tendría sin el filtrado de eventos.

El filtrado de eventos se aplica a la asignación del origen de eventos de Lambda, que es un recurso que lee de un evento elegido (el flujo de DynamoDB) e invoca una función Lambda. En el siguiente diagrama, puede ver cómo una función Lambda consume un elemento de TTL eliminado mediante flujos y filtros de eventos.

Patrón de filtros de eventos de DynamoDB Time to Live

Si agrega el siguiente JSON a sus criterios de filtro de asignación de origen de eventos, podrá invocar su función Lambda solo para los elementos de TTL eliminados:

{ "Filters": [ { "Pattern": { "userIdentity": { "type": ["Service"], "principalId": ["dynamodb.amazonaws.com"] } } } ] }

Creación de una asignación de origen de eventos de AWS Lambda

Utilice los siguientes fragmentos de código para crear una asignación de origen de eventos filtrados que pueda conectar con el flujo de DynamoDB de una tabla. Cada bloque de código incluye el patrón del filtro de eventos.

AWS CLI
aws lambda create-event-source-mapping \ --event-source-arn 'arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000' \ --batch-size 10 \ --enabled \ --function-name test_func \ --starting-position LATEST \ --filter-criteria '{"Filters": [{"Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"}]}'
Java
LambdaClient client = LambdaClient.builder() .region(Region.EU_WEST_1) .build(); Filter userIdentity = Filter.builder() .pattern("{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}") .build(); FilterCriteria filterCriteria = FilterCriteria.builder() .filters(userIdentity) .build(); CreateEventSourceMappingRequest mappingRequest = CreateEventSourceMappingRequest.builder() .eventSourceArn("arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000") .batchSize(10) .enabled(Boolean.TRUE) .functionName("test_func") .startingPosition("LATEST") .filterCriteria(filterCriteria) .build(); try{ CreateEventSourceMappingResponse eventSourceMappingResponse = client.createEventSourceMapping(mappingRequest); System.out.println("The mapping ARN is "+eventSourceMappingResponse.eventSourceArn()); }catch (ServiceException e){ System.out.println(e.getMessage()); }
Node
const client = new LambdaClient({ region: "eu-west-1" }); const input = { EventSourceArn: "arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000", BatchSize: 10, Enabled: true, FunctionName: "test_func", StartingPosition: "LATEST", FilterCriteria: { "Filters": [{ "Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}" }] } } const command = new CreateEventSourceMappingCommand(input); try { const results = await client.send(command); console.log(results); } catch (err) { console.error(err); }
Python
session = boto3.session.Session(region_name = 'eu-west-1') client = session.client('lambda') try: response = client.create_event_source_mapping( EventSourceArn='arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000', BatchSize=10, Enabled=True, FunctionName='test_func', StartingPosition='LATEST', FilterCriteria={ 'Filters': [ { 'Pattern': "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}" }, ] } ) print(response) except Exception as e: print(e)
JSON
{ "userIdentity": { "type": ["Service"], "principalId": ["dynamodb.amazonaws.com"] } }