Tutorial n.º 2: uso de filtros para procesar algunos eventos con DynamoDB y Lambda - Amazon DynamoDB

Tutorial n.º 2: uso de filtros para procesar algunos eventos con DynamoDB y Lambda

En este tutorial, creará un desencadenador AWS Lambda para procesar solo algunos eventos en un flujo de una tabla de DynamoDB.

Con el filtrado de eventos de Lambda puede utilizar expresiones de filtrado para controlar qué eventos envía Lambda a su función para su procesamiento. Puede configurar hasta cinco filtros diferentes por flujos de DynamoDB. Si utiliza intervalos de lotes, Lambda aplica los criterios de filtrado a cada nuevo evento para ver si debe incluirse en el lote actual.

Los filtros se aplican mediante estructuras denominadas FilterCriteria. Los 3 atributos principales de FilterCriteria son metadata properties, data properties y filter patterns.

A continuación, se muestra un ejemplo de estructura de un evento de DynamoDB Streams:

{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }

Las metadata properties son los campos del objeto de evento. En el caso de DynamoDB Streams, las metadata properties son campos como dynamodb o eventName.

Las data properties son los campos del cuerpo de evento. Para filtrar las data properties, asegúrese de incluirlas en FilterCriteria en la clave adecuada. Para los orígenes de eventos de DynamoDB, la clave de datos es NewImage u OldImage.

Por último, las reglas de filtro definirán la expresión filtro que quiere aplicar a una propiedad específica. Estos son algunos ejemplos:

Operador de comparación Ejemplo Sintaxis de reglas (parcial)

Nulo

El tipo de producto es nulo

{ "product_type": { "S": null } }

Vacío

El nombre de producto está vacío

{ "product_name": { "S": [ ""] } }

Igual a

El estado es igual a Florida

{ "state": { "S": ["FL"] } }

Y

El estado del producto es igual a Florida y la categoría del producto es Chocolate

{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } }

O

El estado del producto es Florida o California

{ "state": { "S": ["FL","CA"] } }

No

El estado del producto no es Florida

{"state": {"S": [{"anything-but": ["FL"]}]}}

Existe

El producto Homemade existe

{"homemade": {"S": [{"exists": true}]}}

No existe

El producto Homemade no existe

{"homemade": {"S": [{"exists": false}]}}

Comienza por

PK comienza por COMPANY

{"PK": {"S": [{"prefix": "COMPANY"}]}}

Puede especificar hasta cinco patrones de filtrado de eventos para una función Lambda. Observe que cada uno de esos cinco eventos se evaluará como un O lógico. Por lo tanto, si configura dos filtros denominados Filter_One y Filter_Two, la función Lambda ejecutará Filter_One O Filter_Two.

nota

En la página de filtrado de eventos de Lambda hay algunas opciones para filtrar y comparar valores numéricos; sin embargo, en el caso de los eventos de filtrado de DynamoDB, esto no se aplica porque los números en DynamoDB se almacenan como cadenas. Por ejemplo "quantity": { "N": "50" }, sabemos que es un número debido a la propiedad "N".

Resumen global: AWS CloudFormation

Para mostrar la funcionalidad del filtrado de eventos en la práctica, a continuación se muestra una plantilla de CloudFormation de ejemplo. Esta plantilla generará una tabla de DynamoDB Simple con una clave de partición PK y una clave de clasificación SK con Amazon DynamoDB Streams habilitado. Creará una función Lambda y un rol de ejecución Lambda simple que permitirá escribir registros en Amazon CloudWatch y leer los eventos de Amazon DynamoDB Streams. También agregará la asignación del origen de los eventos entre DynamoDB Streams y la función Lambda, para que la función pueda ejecutarse cada vez que haya un evento en Amazon DynamoDB Streams.

AWSTemplateFormatVersion: "2010-09-09" Description: Sample application that presents AWS Lambda event source filtering with Amazon DynamoDB Streams. Resources: StreamsSampleDDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: "PK" AttributeType: "S" - AttributeName: "SK" AttributeType: "S" KeySchema: - AttributeName: "PK" KeyType: "HASH" - AttributeName: "SK" KeyType: "RANGE" StreamSpecification: StreamViewType: "NEW_AND_OLD_IMAGES" ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 LambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: root PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: arn:aws:logs:*:*:* - Effect: Allow Action: - dynamodb:DescribeStream - dynamodb:GetRecords - dynamodb:GetShardIterator - dynamodb:ListStreams Resource: !GetAtt StreamsSampleDDBTable.StreamArn EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST ProcessEventLambda: Type: AWS::Lambda::Function Properties: Runtime: python3.7 Timeout: 300 Handler: index.handler Role: !GetAtt LambdaExecutionRole.Arn Code: ZipFile: | import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec) Outputs: StreamsSampleDDBTable: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.Arn StreamARN: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.StreamArn

Después de implementar esta plantilla de CloudFormation, puede insertar el siguiente elemento de Amazon DynamoDB:

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }

Gracias a la sencilla función Lambda incluida en línea en esta plantilla de CloudFormation, verá los eventos en los grupos de registro de Amazon CloudWatch para la función Lambda de la siguiente manera:

{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }

Ejemplos de filtro

  • Solo productos que coincidan con un estado determinado

Este ejemplo modifica la plantilla de CloudFormation para incluir un filtro que coincida con todos los productos que provienen de Florida, con la abreviatura “FL”.

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

Una vez que vuelva a implementar la pila, puede agregar el siguiente elemento de DynamoDB a la tabla. Tenga en cuenta que no aparecerá en los registros de la función Lambda, porque el producto de este ejemplo es de California.

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK#1000", "company_id": "1000", "fabric": "Florida Chocolates", "price": 15, "product_id": "1000", "quantity": 50, "state": "CA", "stores": 5, "type": "" }
  • Solo los elementos que comienzan por algunos valores en PK y SK

En este ejemplo se modifica la plantilla de CloudFormation para incluir la siguiente condición:

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

Observe que la condición AND requiere que la condición esté en el patrón, donde las claves PK y SK están en la misma expresión separadas por una coma.

O bien empieza con algunos valores en PK y SK o es de cierto estado.

En este ejemplo se modifica la plantilla de CloudFormation para incluir las siguientes condiciones:

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

Observe que la condición OR se agrega con la incorporación de nuevos patrones en la sección de filtro.

Resumen global: CDK

La siguiente plantilla de formación de proyectos CDK de ejemplo muestra la funcionalidad de filtrado de eventos. Antes de trabajar con este proyecto CDK deberá instalar los requisitos previos, incluida la ejecución de los scripts de preparación.

Crear un proyecto CDK

Primero cree un nuevo proyecto AWS CDK, mediante la invocación de cdk init en un directorio vacío.

mkdir ddb_filters cd ddb_filters cdk init app --language python

El comando cdk init utiliza el nombre de la carpeta del proyecto para asignar un nombre a varios elementos del proyecto, incluidas las clases, las subcarpetas y los archivos. Los guiones del nombre de la carpeta se convierten en guiones bajos. Por lo demás, el nombre debe seguir el formato de un identificador Python. Por ejemplo, no debe comenzar por un número ni contener espacios.

Para trabajar con el nuevo proyecto, active su entorno virtual. Esto permite que las dependencias del proyecto se instalen localmente en la carpeta del proyecto, en lugar de hacerlo globalmente.

source .venv/bin/activate python -m pip install -r requirements.txt
nota

Es posible que reconozca esto como el comando de Mac/Linux para activar un entorno virtual. Las plantillas de Python incluyen un archivo por lotes, source.bat, que permite utilizar el mismo comando en Windows. El comando tradicional de Windows .venv\Scripts\activate.bat también funciona. Si ha inicializado su proyecto AWS CDK con AWS CDK Toolkit v1.70.0 o anterior, su entorno virtual se encuentra en el directorio .env en lugar de .venv.

Infraestructura básica

Abra el archivo ./ddb_filters/ddb_filters_stack.py en el editor de texto que desee. Este archivo se generó automáticamente al crear el proyecto AWS CDK.

A continuación, agregue las funciones _create_ddb_table y _set_ddb_trigger_function. Estas funciones crearán una tabla de DynamoDB con la clave de partición PK y la clave de clasificación SK en modo de aprovisionamiento bajo demanda, con Amazon DynamoDB Streams habilitado de forma predeterminada para mostrar las imágenes nueva y antigua.

La función Lambda se almacenará en la carpeta lambda debajo del archivo app.py. Este archivo se creará más adelante. Incluirá la variable de entorno APP_TABLE_NAME, que será el nombre de la tabla de Amazon DynamoDB creada por esta pila. En la misma función concederemos permisos de lectura del flujo a la función Lambda. Por último, se suscribirá a DynamoDB Streams como origen de eventos para la función Lambda.

Al final del archivo en el método __init__, llamará a las construcciones respectivas para inicializarlas en la pila. Para proyectos más grandes que requieran componentes y servicios adicionales, podría ser mejor definir estas construcciones fuera de la pila base.

import os import json import aws_cdk as cdk from aws_cdk import ( Stack, aws_lambda as _lambda, aws_dynamodb as dynamodb, ) from constructs import Construct class DdbFiltersStack(Stack): def _create_ddb_table(self): dynamodb_table = dynamodb.Table( self, "AppTable", partition_key=dynamodb.Attribute( name="PK", type=dynamodb.AttributeType.STRING ), sort_key=dynamodb.Attribute( name="SK", type=dynamodb.AttributeType.STRING), billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST, stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, removal_policy=cdk.RemovalPolicy.DESTROY, ) cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name) return dynamodb_table def _set_ddb_trigger_function(self, ddb_table): events_lambda = _lambda.Function( self, "LambdaHandler", runtime=_lambda.Runtime.PYTHON_3_9, code=_lambda.Code.from_asset("lambda"), handler="app.handler", environment={ "APP_TABLE_NAME": ddb_table.table_name, }, ) ddb_table.grant_stream_read(events_lambda) event_subscription = _lambda.CfnEventSourceMapping( scope=self, id="companyInsertsOnlyEventSourceMapping", function_name=events_lambda.function_name, event_source_arn=ddb_table.table_stream_arn, maximum_batching_window_in_seconds=1, starting_position="LATEST", batch_size=1, ) def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: super().__init__(scope, construct_id, **kwargs) ddb_table = self._create_ddb_table() self._set_ddb_trigger_function(ddb_table)

Ahora crearemos una función Lambda muy sencilla que imprimirá los registros en Amazon CloudWatch. Para ello, cree una carpeta nueva llamada lambda.

mkdir lambda touch app.py

Con su editor de texto favorito, agregue el siguiente contenido al archivo app.py:

import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec)

Asegúrese de que está en la carpeta /ddb_filters/ y escriba el siguiente comando para crear la aplicación de muestra:

cdk deploy

En algún momento se le pedirá que confirme si desea implementar la solución. Escriba Y para aceptar los cambios.

├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤ │ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │ └───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘ Do you wish to deploy these changes (y/n)? y ... ✨ Deployment time: 67.73s Outputs: DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP Stack ARN: arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6

Una vez implementados los cambios, abra la consola de AWS y agregue un elemento a la tabla.

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }

Los registros de CloudWatch ahora deben contener toda la información de esta entrada.

Ejemplos de filtro

  • Solo productos que coincidan con un estado determinado

Abra el archivo ddb_filters/ddb_filters/ddb_filters_stack.py y modifíquelo para incluir el filtro que coincide con todos los productos que son iguales a “FL”. Esto se puede revisar justo debajo de event_subscription en la línea 45.

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )
  • Solo los elementos que comienzan por algunos valores en PK y SK

Modifique el script python para incluir la siguiente condición:

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, ] },
  • O bien comience con algunos valores en PK y SK o desde un determinado estado.

Modifique el script python para incluir las siguientes condiciones:

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )

Observe que la condición OR se agrega al agregar más elementos a la matriz Filters.

Limpieza

Localice la pila de filtros en la base de su directorio de trabajo y ejecute cdk destroy. Se le pedirá que confirme la eliminación del recurso:

cdk destroy Are you sure you want to delete: DdbFiltersStack (y/n)? y