

# Tutorial n.º 2: uso de filtros para procesar algunos eventos con DynamoDB y Lambda
<a name="Streams.Lambda.Tutorial2"></a>

En este tutorial, creará un desencadenador AWS Lambda para procesar solo algunos eventos en un flujo de una tabla de DynamoDB.

**Topics**
+ [Resumen global: CloudFormation](#Streams.Lambda.Tutorial2.Cloudformation)
+ [Resumen global: CDK](#Streams.Lambda.Tutorial2.CDK)

Con el [filtrado de eventos de Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html) puede utilizar expresiones de filtrado para controlar qué eventos envía Lambda a su función para su procesamiento. Puede configurar hasta cinco filtros diferentes por flujos de DynamoDB. Si utiliza intervalos de lotes, Lambda aplica los criterios de filtrado a cada nuevo evento para ver si debe incluirse en el lote actual.

Los filtros se aplican mediante estructuras denominadas `FilterCriteria`. Los 3 atributos principales de `FilterCriteria` son `metadata properties`, `data properties` y `filter patterns`. 

A continuación, se muestra un ejemplo de estructura de un evento de DynamoDB Streams:

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

Las `metadata properties` son los campos del objeto de evento. En el caso de DynamoDB Streams, las `metadata properties` son campos como `dynamodb` o `eventName`. 

Las `data properties` son los campos del cuerpo de evento. Para filtrar las `data properties`, asegúrese de incluirlas en `FilterCriteria` en la clave adecuada. Para los orígenes de eventos de DynamoDB, la clave de datos es `NewImage` u `OldImage`.

Por último, las reglas de filtro definirán la expresión filtro que quiere aplicar a una propiedad específica. Estos son algunos ejemplos:


| Operador de comparación | Ejemplo | Sintaxis de reglas (parcial) | 
| --- | --- | --- | 
|  Nulo  |  El tipo de producto es nulo  |  `{ "product_type": { "S": null } } `  | 
|  Vacío  |  El nombre de producto está vacío  |  `{ "product_name": { "S": [ ""] } } `  | 
|  Igual a  |  El estado es igual a Florida  |  `{ "state": { "S": ["FL"] } } `  | 
|  Y  |  El estado del producto es igual a Florida y la categoría del producto es Chocolate  |  `{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } } `  | 
|  O  |  El estado del producto es Florida o California  |  `{ "state": { "S": ["FL","CA"] } } `  | 
|  No (Negación)  |  El estado del producto no es Florida  |  `{"state": {"S": [{"anything-but": ["FL"]}]}}`  | 
|  Exists  |  El producto Homemade existe  |  `{"homemade": {"S": [{"exists": true}]}}`  | 
|  No existe  |  El producto Homemade no existe  |  `{"homemade": {"S": [{"exists": false}]}}`  | 
|  Comienza por  |  PK comienza por COMPANY  |  `{"PK": {"S": [{"prefix": "COMPANY"}]}}`  | 

Puede especificar hasta cinco patrones de filtrado de eventos para una función Lambda. Observe que cada uno de esos cinco eventos se evaluará como un O lógico. Por lo tanto, si configura dos filtros denominados `Filter_One` y `Filter_Two`, la función Lambda ejecutará `Filter_One` O `Filter_Two`.

**nota**  
En la página de [filtrado de eventos de Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html) hay algunas opciones para filtrar y comparar valores numéricos; sin embargo, en el caso de los eventos de filtrado de DynamoDB, esto no se aplica porque los números en DynamoDB se almacenan como cadenas. Por ejemplo ` "quantity": { "N": "50" }`, sabemos que es un número debido a la propiedad `"N"`.

## Resumen global: CloudFormation
<a name="Streams.Lambda.Tutorial2.Cloudformation"></a>

Para mostrar la funcionalidad del filtrado de eventos en la práctica, a continuación se muestra una plantilla de CloudFormation de ejemplo. Esta plantilla generará una tabla de DynamoDB Simple con una clave de partición PK y una clave de clasificación SK con Amazon DynamoDB Streams habilitado. Creará una función Lambda y un rol de ejecución Lambda simple que permitirá escribir registros en Amazon CloudWatch y leer los eventos de Amazon DynamoDB Streams. También agregará la asignación del origen de los eventos entre DynamoDB Streams y la función Lambda, para que la función pueda ejecutarse cada vez que haya un evento en Amazon DynamoDB Streams.

```
AWSTemplateFormatVersion: "2010-09-09"

Description: Sample application that presents AWS Lambda event source filtering 
with Amazon DynamoDB Streams.

Resources:
  StreamsSampleDDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: "PK"
          AttributeType: "S"
        - AttributeName: "SK"
          AttributeType: "S"
      KeySchema:
        - AttributeName: "PK"
          KeyType: "HASH"
        - AttributeName: "SK"
          KeyType: "RANGE"
      StreamSpecification:
        StreamViewType: "NEW_AND_OLD_IMAGES"
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 5

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17",		 	 	 
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: "/"
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: "2012-10-17",		 	 	 
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: arn:aws:logs:*:*:*
              - Effect: Allow
                Action:
                  - dynamodb:DescribeStream
                  - dynamodb:GetRecords
                  - dynamodb:GetShardIterator
                  - dynamodb:ListStreams
                Resource: !GetAtt StreamsSampleDDBTable.StreamArn

  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST

  ProcessEventLambda:
    Type: AWS::Lambda::Function
    Properties:
      Runtime: python3.7
      Timeout: 300
      Handler: index.handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Code:
        ZipFile: |
          import logging

          LOGGER = logging.getLogger()
          LOGGER.setLevel(logging.INFO)

          def handler(event, context):
            LOGGER.info('Received Event: %s', event)
            for rec in event['Records']:
              LOGGER.info('Record: %s', rec)

Outputs:
  StreamsSampleDDBTable:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.Arn
  StreamARN:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.StreamArn
```

Después de implementar esta plantilla de CloudFormation, puede insertar el siguiente elemento de Amazon DynamoDB:

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

Gracias a la sencilla función Lambda incluida en línea en esta plantilla de CloudFormation, verá los eventos en los grupos de registro de Amazon CloudWatch para la función Lambda de la siguiente manera:

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

**Ejemplos de filtro**
+ **Solo productos que coincidan con un estado determinado**

Este ejemplo modifica la plantilla de CloudFormation para incluir un filtro que coincida con todos los productos que provienen de Florida, con la abreviatura “FL”.

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Una vez que vuelva a implementar la pila, puede agregar el siguiente elemento de DynamoDB a la tabla. Tenga en cuenta que no aparecerá en los registros de la función Lambda, porque el producto de este ejemplo es de California.

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK#1000",
 "company_id": "1000",
 "fabric": "Florida Chocolates",
 "price": 15,
 "product_id": "1000",
 "quantity": 50,
 "state": "CA",
 "stores": 5,
 "type": ""
}
```
+ **Solo los elementos que comienzan por algunos valores en PK y SK**

En este ejemplo se modifica la plantilla de CloudFormation para incluir la siguiente condición:

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Observe que la condición AND requiere que la condición esté en el patrón, donde las claves PK y SK están en la misma expresión separadas por una coma.

O bien empieza con algunos valores en PK y SK o es de cierto estado.

En este ejemplo se modifica la plantilla de CloudFormation para incluir las siguientes condiciones:

```
  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Observe que la condición OR se agrega con la incorporación de nuevos patrones en la sección de filtro.

## Resumen global: CDK
<a name="Streams.Lambda.Tutorial2.CDK"></a>

La siguiente plantilla de formación de proyectos CDK de ejemplo muestra la funcionalidad de filtrado de eventos. Antes de trabajar con este proyecto CDK deberá instalar los [requisitos previos](https://docs.aws.amazon.com/cdk/v2/guide/work-with.html), incluida la [ejecución de los scripts de preparación](https://docs.aws.amazon.com/cdk/v2/guide/work-with-cdk-python.html).

**Crear un proyecto CDK**

Primero cree un nuevo proyecto AWS CDK, mediante la invocación de `cdk init` en un directorio vacío.

```
mkdir ddb_filters
cd ddb_filters
cdk init app --language python
```

El comando `cdk init` utiliza el nombre de la carpeta del proyecto para asignar un nombre a varios elementos del proyecto, incluidas las clases, las subcarpetas y los archivos. Los guiones del nombre de la carpeta se convierten en guiones bajos. Por lo demás, el nombre debe seguir el formato de un identificador Python. Por ejemplo, no debe comenzar por un número ni contener espacios.

Para trabajar con el nuevo proyecto, active su entorno virtual. Esto permite que las dependencias del proyecto se instalen localmente en la carpeta del proyecto, en lugar de hacerlo globalmente.

```
source .venv/bin/activate
python -m pip install -r requirements.txt
```

**nota**  
Es posible que reconozca esto como el comando de Mac/Linux para activar un entorno virtual. Las plantillas de Python incluyen un archivo por lotes, `source.bat`, que permite utilizar el mismo comando en Windows. El comando tradicional de Windows `.venv\Scripts\activate.bat` también funciona. Si ha inicializado su proyecto AWS CDK con AWS CDK Toolkit v1.70.0 o anterior, su entorno virtual se encuentra en el directorio `.env` en lugar de `.venv`. 

**Infraestructura básica**

Abra el archivo `./ddb_filters/ddb_filters_stack.py` en el editor de texto que desee. Este archivo se generó automáticamente al crear el proyecto AWS CDK. 

A continuación, agregue las funciones `_create_ddb_table` y `_set_ddb_trigger_function`. Estas funciones crearán una tabla de DynamoDB con la clave de partición PK y la clave de clasificación SK en modo de aprovisionamiento bajo demanda, con Amazon DynamoDB Streams habilitado de forma predeterminada para mostrar las imágenes nueva y antigua.

La función Lambda se almacenará en la carpeta `lambda` debajo del archivo `app.py`. Este archivo se creará más adelante. Incluirá la variable de entorno `APP_TABLE_NAME`, que será el nombre de la tabla de Amazon DynamoDB creada por esta pila. En la misma función concederemos permisos de lectura del flujo a la función Lambda. Por último, se suscribirá a DynamoDB Streams como origen de eventos para la función Lambda. 

Al final del archivo en el método `__init__`, llamará a las construcciones respectivas para inicializarlas en la pila. Para proyectos más grandes que requieran componentes y servicios adicionales, podría ser mejor definir estas construcciones fuera de la pila base. 

```
import os
import json

import aws_cdk as cdk
from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_dynamodb as dynamodb,
)
from constructs import Construct


class DdbFiltersStack(Stack):

    def _create_ddb_table(self):
        dynamodb_table = dynamodb.Table(
            self,
            "AppTable",
            partition_key=dynamodb.Attribute(
                name="PK", type=dynamodb.AttributeType.STRING
            ),
            sort_key=dynamodb.Attribute(
                name="SK", type=dynamodb.AttributeType.STRING),
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
            stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
            removal_policy=cdk.RemovalPolicy.DESTROY,
        )

        cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name)
        return dynamodb_table

    def _set_ddb_trigger_function(self, ddb_table):
        events_lambda = _lambda.Function(
            self,
            "LambdaHandler",
            runtime=_lambda.Runtime.PYTHON_3_9,
            code=_lambda.Code.from_asset("lambda"),
            handler="app.handler",
            environment={
                "APP_TABLE_NAME": ddb_table.table_name,
            },
        )

        ddb_table.grant_stream_read(events_lambda)

        event_subscription = _lambda.CfnEventSourceMapping(
            scope=self,
            id="companyInsertsOnlyEventSourceMapping",
            function_name=events_lambda.function_name,
            event_source_arn=ddb_table.table_stream_arn,
            maximum_batching_window_in_seconds=1,
            starting_position="LATEST",
            batch_size=1,
        )

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        ddb_table = self._create_ddb_table()
        self._set_ddb_trigger_function(ddb_table)
```

Ahora crearemos una función Lambda muy sencilla que imprimirá los registros en Amazon CloudWatch. Para ello, cree una carpeta nueva llamada `lambda`.

```
mkdir lambda
touch app.py
```

Con su editor de texto favorito, agregue el siguiente contenido al archivo `app.py`:

```
import logging

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)


def handler(event, context):
    LOGGER.info('Received Event: %s', event)
    for rec in event['Records']:
        LOGGER.info('Record: %s', rec)
```

Asegúrese de que está en la carpeta `/ddb_filters/` y escriba el siguiente comando para crear la aplicación de muestra:

```
cdk deploy
```

En algún momento se le pedirá que confirme si desea implementar la solución. Escriba `Y` para aceptar los cambios.

```
├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤
│ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │
└───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘

Do you wish to deploy these changes (y/n)? y

...

✨  Deployment time: 67.73s

Outputs:
DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP
Stack ARN:
arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6
```

Una vez implementados los cambios, abra la consola de AWS y agregue un elemento a la tabla. 

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

Los registros de CloudWatch ahora deben contener toda la información de esta entrada. 

**Ejemplos de filtro**
+ **Solo productos que coincidan con un estado determinado**

Abra el archivo `ddb_filters/ddb_filters/ddb_filters_stack.py` y modifíquelo para incluir el filtro que coincide con todos los productos que son iguales a “FL”. Esto se puede revisar justo debajo de `event_subscription` en la línea 45.

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```
+ **Solo los elementos que comienzan por algunos valores en PK y SK**

Modifique el script python para incluir la siguiente condición:

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
        ]
    },
```
+ **O bien comience con algunos valores en PK y SK o desde un determinado estado.**

Modifique el script python para incluir las siguientes condiciones:

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```

Observe que la condición OR se agrega al agregar más elementos a la matriz Filters.

**Limpieza**

Localice la pila de filtros en la base de su directorio de trabajo y ejecute `cdk destroy`. Se le pedirá que confirme la eliminación del recurso:

```
cdk destroy
Are you sure you want to delete: DdbFiltersStack (y/n)? y
```