

# Tutorial 2: Usar filtros para processar alguns eventos com o DynamoDB e o Lambda
<a name="Streams.Lambda.Tutorial2"></a>

Neste tutorial, você criará um acionador do AWS Lambda para processar somente alguns eventos em um fluxo de uma tabela do DynamoDB.

**Topics**
+ [Reunir todos os componentes: CloudFormation](#Streams.Lambda.Tutorial2.Cloudformation)
+ [Reunir todos os componentes: CDK](#Streams.Lambda.Tutorial2.CDK)

Com a [filtragem de eventos do Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html), é possível utilizar expressões de filtro para controlar quais eventos o Lambda enviará para a função processar. É possível configurar até cinco filtros diferentes por fluxo do DynamoDB. Se você estiver usando janelas em lotes, o Lambda aplicará os critérios de filtro a cada novo evento para determinar se deseja adicioná-lo ao lote atual.

Os filtros são aplicados por meio de estruturas chamadas `FilterCriteria`. Os três principais atributos de `FilterCriteria` são `metadata properties`, `data properties` e `filter patterns`. 

Aqui está um exemplo de estrutura de um evento do DynamoDB Streams:

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

Essas `metadata properties` são os campos do objeto do evento. No caso dos DynamoDB Streams, as `metadata properties` são campos como o `dynamodb` ou o `eventName`. 

Essas `data properties` são os campos do corpo do evento. Para filtrar as `data properties`, certifique-se de contê-las em `FilterCriteria` dentro da chave adequada. Para fontes de eventos do DynamoDB, a chave de dados é `NewImage` ou `OldImage`.

Por fim, as regras de filtro definirão a expressão de filtro que você deseja aplicar a uma propriedade específica. Veja alguns exemplos:


| Operador de comparação | Exemplo | Sintaxe da regra (parcial) | 
| --- | --- | --- | 
|  Nulo  |  O tipo de produto é nulo  |  `{ "product_type": { "S": null } } `  | 
|  Vazio  |  O nome do produto está vazio  |  `{ "product_name": { "S": [ ""] } } `  | 
|  Igual  |  O estado é igual a Flórida  |  `{ "state": { "S": ["FL"] } } `  | 
|  E  |  O estado do produto é igual à Flórida e a categoria do produto é Chocolate  |  `{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } } `  | 
|  Ou  |  O estado do produto é Flórida ou Califórnia  |  `{ "state": { "S": ["FL","CA"] } } `  | 
|  Não  |  O estado do produto não é Flórida  |  `{"state": {"S": [{"anything-but": ["FL"]}]}}`  | 
|  Existe  |  O produto caseiro existe  |  `{"homemade": {"S": [{"exists": true}]}}`  | 
|  Não existe  |  O produto Homemade não existe  |  `{"homemade": {"S": [{"exists": false}]}}`  | 
|  Começa com  |  PK começa com COMPANY  |  `{"PK": {"S": [{"prefix": "COMPANY"}]}}`  | 

É possível especificar até cinco padrões de filtragem de eventos em uma função do Lambda. Observe que cada um desses cinco eventos será avaliado como um OR lógico. Então, se você configurar dois filtros chamados `Filter_One` e `Filter_Two`, a função do Lambda executará `Filter_One` OU `Filter_Two`.

**nota**  
Na página de [filtragem de eventos do Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html), há algumas opções para filtrar e comparar valores numéricos. No entanto, no caso de eventos de filtro do DynamoDB, isso não se aplica porque os números no DynamoDB são armazenados como strings. Por exemplo ` "quantity": { "N": "50" }`, sabemos que é um número por causa da propriedade `"N"`.

## Reunir todos os componentes: CloudFormation
<a name="Streams.Lambda.Tutorial2.Cloudformation"></a>

Para mostrar a funcionalidade de filtragem de eventos na prática, aqui está um exemplo de modelo do CloudFormation. Esse modelo gerará uma tabela simples do DynamoDB com uma chave de partição PK e uma chave de classificação SK com o Amazon DynamoDB Streams habilitado. Ele criará uma função do Lambda e uma função simples de execução do Lambda que permitirá gravar logs no Amazon Cloudwatch e ler os eventos do Amazon DynamoDB Stream. Ele também adicionará o mapeamento da origem do evento entre os DynamoDB Streams e a função do Lambda, para que a função possa ser executada sempre que houver um evento no Amazon DynamoDB Streams.

```
AWSTemplateFormatVersion: "2010-09-09"

Description: Sample application that presents AWS Lambda event source filtering 
with Amazon DynamoDB Streams.

Resources:
  StreamsSampleDDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: "PK"
          AttributeType: "S"
        - AttributeName: "SK"
          AttributeType: "S"
      KeySchema:
        - AttributeName: "PK"
          KeyType: "HASH"
        - AttributeName: "SK"
          KeyType: "RANGE"
      StreamSpecification:
        StreamViewType: "NEW_AND_OLD_IMAGES"
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 5

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17",		 	 	 
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: "/"
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: "2012-10-17",		 	 	 
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: arn:aws:logs:*:*:*
              - Effect: Allow
                Action:
                  - dynamodb:DescribeStream
                  - dynamodb:GetRecords
                  - dynamodb:GetShardIterator
                  - dynamodb:ListStreams
                Resource: !GetAtt StreamsSampleDDBTable.StreamArn

  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST

  ProcessEventLambda:
    Type: AWS::Lambda::Function
    Properties:
      Runtime: python3.7
      Timeout: 300
      Handler: index.handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Code:
        ZipFile: |
          import logging

          LOGGER = logging.getLogger()
          LOGGER.setLevel(logging.INFO)

          def handler(event, context):
            LOGGER.info('Received Event: %s', event)
            for rec in event['Records']:
              LOGGER.info('Record: %s', rec)

Outputs:
  StreamsSampleDDBTable:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.Arn
  StreamARN:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.StreamArn
```

Depois de implantar esse modelo de formação de nuvem, é possível inserir o seguinte item do Amazon DynamoDB:

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

Graças à função simples do Lambda incluída em linha nesse modelo de formação de nuvem, você verá os eventos nos grupos de logs do Amazon CloudWatch para a função do Lambda da seguinte forma:

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

**Exemplos de filtragem**
+ **Somente produtos que correspondam a um determinado estado**

Este exemplo modifica o modelo do CloudFormation para incluir um filtro que corresponda a todos os produtos provenientes da Flórida, com a abreviatura “FL”.

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Depois de reimplantar a pilha, é possível adicionar o seguinte item do DynamoDB à tabela. Observe que ele não aparecerá nos logs de funções do Lambda, porque o produto neste exemplo é da Califórnia.

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK#1000",
 "company_id": "1000",
 "fabric": "Florida Chocolates",
 "price": 15,
 "product_id": "1000",
 "quantity": 50,
 "state": "CA",
 "stores": 5,
 "type": ""
}
```
+ **Somente os itens que começam com alguns valores em PK e SK**

Este exemplo modifica o modelo do CloudFormation para incluir a seguinte condição:

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Observe que a condição AND exige que a condição esteja dentro do padrão, onde as chaves PK e SK estão na mesma expressão separadas por vírgula.

Comece com alguns valores em PK e SK ou de determinado estado.

Este exemplo modifica o modelo do CloudFormation para incluir as seguintes condições:

```
  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Observe que a condição OR é adicionada introduzindo novos padrões na seção de filtro.

## Reunir todos os componentes: CDK
<a name="Streams.Lambda.Tutorial2.CDK"></a>

O exemplo de modelo de formação de projeto CDK a seguir mostra a funcionalidade de filtragem de eventos. Antes de trabalhar com esse projeto de CDK, será preciso [instalar os pré-requisitos](https://docs.aws.amazon.com/cdk/v2/guide/work-with.html), incluindo a [execução de scripts de preparação](https://docs.aws.amazon.com/cdk/v2/guide/work-with-cdk-python.html).

**Criar um projeto de CDK**

Primeiro, crie um novo projeto do AWS CDK, invocando `cdk init` em um diretório vazio.

```
mkdir ddb_filters
cd ddb_filters
cdk init app --language python
```

O comando `cdk init` usa o nome da pasta do projeto para nomear vários elementos do projeto, incluindo classes, subpastas e arquivos. Todos os hifens no nome da pasta são convertidos em sublinhados. Caso contrário, o nome deve seguir a forma de um identificador Python. Por exemplo, ele não deve começar com um número nem conter espaços.

Para trabalhar com o novo projeto, ative o respectivo ambiente virtual. Isso permite que as dependências do projeto sejam instaladas localmente na pasta do projeto, em vez de globalmente.

```
source .venv/bin/activate
python -m pip install -r requirements.txt
```

**nota**  
É possível reconhecer isso como o comando Mac/Linux para ativar um ambiente virtual. Os modelos do Python incluem um arquivo em lote, `source.bat`, que permite que o mesmo comando seja utilizado no Windows. O comando tradicional do Windows `.venv\Scripts\activate.bat` também funciona. Se você inicializou seu projeto do AWS CDK usando o AWS CDK Toolkit v1.70.0 ou anterior, seu ambiente virtual está no diretório `.env` em vez de `.venv`. 

**Infraestrutura base**

Abra o arquivo `./ddb_filters/ddb_filters_stack.py` com o editor de texto de sua preferência. Esse arquivo foi gerado automaticamente quando você criou o projeto do AWS CDK. 

Em seguida, adicione as funções `_create_ddb_table` e `_set_ddb_trigger_function`. Essas funções criarão uma tabela do DynamoDB com a chave de partição PK e a chave de classificação SK no modo de provisionamento sob demanda, com o Amazon DynamoDB Streams habilitado por padrão para mostrar imagens novas e antigas.

A função do Lambda será armazenada na pasta `lambda` abaixo do arquivo `app.py`. Esse arquivo será criado posteriormente. Ele incluirá uma variável de ambiente `APP_TABLE_NAME`, que será o nome da tabela do Amazon DynamoDB criada por essa pilha. Na mesma função, concederemos permissões de leitura de fluxo para a função do Lambda. Por fim, ele se inscreverá no DynamoDB Streams como fonte de eventos para a função do Lambda. 

No final do arquivo no método `__init__`, você chamará as respectivas estruturas para inicializá-las na pilha. Para projetos maiores que exigem componentes e serviços adicionais, talvez seja melhor definir essas estruturas fora da pilha base. 

```
import os
import json

import aws_cdk as cdk
from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_dynamodb as dynamodb,
)
from constructs import Construct


class DdbFiltersStack(Stack):

    def _create_ddb_table(self):
        dynamodb_table = dynamodb.Table(
            self,
            "AppTable",
            partition_key=dynamodb.Attribute(
                name="PK", type=dynamodb.AttributeType.STRING
            ),
            sort_key=dynamodb.Attribute(
                name="SK", type=dynamodb.AttributeType.STRING),
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
            stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
            removal_policy=cdk.RemovalPolicy.DESTROY,
        )

        cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name)
        return dynamodb_table

    def _set_ddb_trigger_function(self, ddb_table):
        events_lambda = _lambda.Function(
            self,
            "LambdaHandler",
            runtime=_lambda.Runtime.PYTHON_3_9,
            code=_lambda.Code.from_asset("lambda"),
            handler="app.handler",
            environment={
                "APP_TABLE_NAME": ddb_table.table_name,
            },
        )

        ddb_table.grant_stream_read(events_lambda)

        event_subscription = _lambda.CfnEventSourceMapping(
            scope=self,
            id="companyInsertsOnlyEventSourceMapping",
            function_name=events_lambda.function_name,
            event_source_arn=ddb_table.table_stream_arn,
            maximum_batching_window_in_seconds=1,
            starting_position="LATEST",
            batch_size=1,
        )

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        ddb_table = self._create_ddb_table()
        self._set_ddb_trigger_function(ddb_table)
```

Agora, criaremos uma função do Lambda muito simples que imprimirá os logs no Amazon CloudWatch. Para fazer isso, crie uma pasta chamada `lambda`.

```
mkdir lambda
touch app.py
```

Usando o editor de texto de sua preferência, adicione o seguinte conteúdo ao arquivo `app.py`:

```
import logging

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)


def handler(event, context):
    LOGGER.info('Received Event: %s', event)
    for rec in event['Records']:
        LOGGER.info('Record: %s', rec)
```

Garantindo que você esteja na pasta `/ddb_filters/`, digite o seguinte comando para criar a aplicação de exemplo:

```
cdk deploy
```

Em algum momento, você deverá confirmar se deseja implantar a solução. Aceite as alterações digitando `Y`.

```
├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤
│ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │
└───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘

Do you wish to deploy these changes (y/n)? y

...

✨  Deployment time: 67.73s

Outputs:
DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP
Stack ARN:
arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6
```

Depois que as alterações forem implantadas, abra o console da AWS e adicione um item à tabela. 

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

Os logs do CloudWatch agora devem conter todas as informações dessa entrada. 

**Exemplos de filtragem**
+ **Somente produtos que correspondam a um determinado estado**

Abra o arquivo `ddb_filters/ddb_filters/ddb_filters_stack.py` e modifique-o para incluir o filtro que corresponde a todos os produtos que são iguais a “FL”. Isso pode ser revisado logo abaixo de `event_subscription` na linha 45.

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```
+ **Somente os itens que começam com alguns valores em PK e SK**

Modifique o script Python para incluir a seguinte condição:

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
        ]
    },
```
+ **Comece com alguns valores em PK e SK ou de determinado estado.**

Modifique o script Python para incluir as seguintes condições:

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```

Observe que a condição OR é adicionada incluindo mais elementos à matriz Filters (Filtros).

**Limpeza**

Localize a pilha de filtros na base do diretório de trabalho e execute `cdk destroy`. Confirme a exclusão do recurso:

```
cdk destroy
Are you sure you want to delete: DdbFiltersStack (y/n)? y
```