Tutorial 2: Usar filtros para processar alguns eventos com o DynamoDB e o Lambda - Amazon DynamoDB

Tutorial 2: Usar filtros para processar alguns eventos com o DynamoDB e o Lambda

Neste tutorial, você criará um acionador do AWS Lambda para processar somente alguns eventos em um fluxo de uma tabela do DynamoDB.

Com a filtragem de eventos do Lambda, é possível utilizar expressões de filtro para controlar quais eventos o Lambda enviará para a função processar. É possível configurar até cinco filtros diferentes por fluxo do DynamoDB. Se você estiver usando janelas em lotes, o Lambda aplicará os critérios de filtro a cada novo evento para determinar se deseja adicioná-lo ao lote atual.

Os filtros são aplicados por meio de estruturas chamadas FilterCriteria. Os três principais atributos de FilterCriteria são metadata properties, data properties e filter patterns.

Aqui está um exemplo de estrutura de um evento do DynamoDB Streams:

{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }

Essas metadata properties são os campos do objeto do evento. No caso dos DynamoDB Streams, as metadata properties são campos como o dynamodb ou o eventName.

Essas data properties são os campos do corpo do evento. Para filtrar as data properties, certifique-se de contê-las em FilterCriteria dentro da chave adequada. Para fontes de eventos do DynamoDB, a chave de dados é NewImage ou OldImage.

Por fim, as regras de filtro definirão a expressão de filtro que você deseja aplicar a uma propriedade específica. Veja alguns exemplos:

Operador de comparação Exemplo Sintaxe da regra (parcial)

Nulo

O tipo de produto é nulo

{ "product_type": { "S": null } }

Vazio

O nome do produto está vazio

{ "product_name": { "S": [ ""] } }

Igual

O estado é igual a Flórida

{ "state": { "S": ["FL"] } }

E

O estado do produto é igual à Flórida e a categoria do produto é Chocolate

{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } }

Ou

O estado do produto é Flórida ou Califórnia

{ "state": { "S": ["FL","CA"] } }

Não

O estado do produto não é Flórida

{"state": {"S": [{"anything-but": ["FL"]}]}}

Existe

O produto caseiro existe

{"homemade": {"S": [{"exists": true}]}}

Não existe

O produto Homemade não existe

{"homemade": {"S": [{"exists": false}]}}

Começa com

PK começa com COMPANY

{"PK": {"S": [{"prefix": "COMPANY"}]}}

É possível especificar até cinco padrões de filtragem de eventos em uma função do Lambda. Observe que cada um desses cinco eventos será avaliado como um OR lógico. Então, se você configurar dois filtros chamados Filter_One e Filter_Two, a função do Lambda executará Filter_One OU Filter_Two.

nota

Na página de filtragem de eventos do Lambda, há algumas opções para filtrar e comparar valores numéricos. No entanto, no caso de eventos de filtro do DynamoDB, isso não se aplica porque os números no DynamoDB são armazenados como strings. Por exemplo "quantity": { "N": "50" }, sabemos que é um número por causa da propriedade "N".

Reunir todos os componentes: AWS CloudFormation

Para mostrar a funcionalidade de filtragem de eventos na prática, aqui está um exemplo de modelo do CloudFormation. Esse modelo gerará uma tabela simples do DynamoDB com uma chave de partição PK e uma chave de classificação SK com o Amazon DynamoDB Streams habilitado. Ele criará uma função do Lambda e uma função simples de execução do Lambda que permitirá gravar logs no Amazon Cloudwatch e ler os eventos do Amazon DynamoDB Stream. Ele também adicionará o mapeamento da origem do evento entre os DynamoDB Streams e a função do Lambda, para que a função possa ser executada sempre que houver um evento no Amazon DynamoDB Streams.

AWSTemplateFormatVersion: "2010-09-09" Description: Sample application that presents AWS Lambda event source filtering with Amazon DynamoDB Streams. Resources: StreamsSampleDDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: "PK" AttributeType: "S" - AttributeName: "SK" AttributeType: "S" KeySchema: - AttributeName: "PK" KeyType: "HASH" - AttributeName: "SK" KeyType: "RANGE" StreamSpecification: StreamViewType: "NEW_AND_OLD_IMAGES" ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 LambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: root PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: arn:aws:logs:*:*:* - Effect: Allow Action: - dynamodb:DescribeStream - dynamodb:GetRecords - dynamodb:GetShardIterator - dynamodb:ListStreams Resource: !GetAtt StreamsSampleDDBTable.StreamArn EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST ProcessEventLambda: Type: AWS::Lambda::Function Properties: Runtime: python3.7 Timeout: 300 Handler: index.handler Role: !GetAtt LambdaExecutionRole.Arn Code: ZipFile: | import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec) Outputs: StreamsSampleDDBTable: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.Arn StreamARN: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.StreamArn

Depois de implantar esse modelo de formação de nuvem, é possível inserir o seguinte item do Amazon DynamoDB:

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }

Graças à função simples do Lambda incluída em linha nesse modelo de formação de nuvem, você verá os eventos nos grupos de logs do Amazon CloudWatch para a função do Lambda da seguinte forma:

{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }

Exemplos de filtragem

  • Somente produtos que correspondam a um determinado estado

Este exemplo modifica o modelo do CloudFormation para incluir um filtro que corresponda a todos os produtos provenientes da Flórida, com a abreviatura “FL”.

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

Depois de reimplantar a pilha, é possível adicionar o seguinte item do DynamoDB à tabela. Observe que ele não aparecerá nos logs de funções do Lambda, porque o produto neste exemplo é da Califórnia.

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK#1000", "company_id": "1000", "fabric": "Florida Chocolates", "price": 15, "product_id": "1000", "quantity": 50, "state": "CA", "stores": 5, "type": "" }
  • Somente os itens que começam com alguns valores em PK e SK

Este exemplo modifica o modelo do CloudFormation para incluir a seguinte condição:

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

Observe que a condição AND exige que a condição esteja dentro do padrão, onde as chaves PK e SK estão na mesma expressão separadas por vírgula.

Comece com alguns valores em PK e SK ou de determinado estado.

Este exemplo modifica o modelo do CloudFormation para incluir as seguintes condições:

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

Observe que a condição OR é adicionada introduzindo novos padrões na seção de filtro.

Reunir todos os componentes: CDK

O exemplo de modelo de formação de projeto CDK a seguir mostra a funcionalidade de filtragem de eventos. Antes de trabalhar com esse projeto de CDK, será preciso instalar os pré-requisitos, incluindo a execução de scripts de preparação.

Criar um projeto de CDK

Primeiro, crie um novo projeto do AWS CDK, invocando cdk init em um diretório vazio.

mkdir ddb_filters cd ddb_filters cdk init app --language python

O comando cdk init usa o nome da pasta do projeto para nomear vários elementos do projeto, incluindo classes, subpastas e arquivos. Todos os hifens no nome da pasta são convertidos em sublinhados. Caso contrário, o nome deve seguir a forma de um identificador Python. Por exemplo, ele não deve começar com um número nem conter espaços.

Para trabalhar com o novo projeto, ative o respectivo ambiente virtual. Isso permite que as dependências do projeto sejam instaladas localmente na pasta do projeto, em vez de globalmente.

source .venv/bin/activate python -m pip install -r requirements.txt
nota

É possível reconhecer isso como o comando Mac/Linux para ativar um ambiente virtual. Os modelos do Python incluem um arquivo em lote, source.bat, que permite que o mesmo comando seja utilizado no Windows. O comando tradicional do Windows .venv\Scripts\activate.bat também funciona. Se você inicializou seu projeto do AWS CDK usando o AWS CDK Toolkit v1.70.0 ou anterior, seu ambiente virtual está no diretório .env em vez de .venv.

Infraestrutura base

Abra o arquivo ./ddb_filters/ddb_filters_stack.py com o editor de texto de sua preferência. Esse arquivo foi gerado automaticamente quando você criou o projeto do AWS CDK.

Em seguida, adicione as funções _create_ddb_table e _set_ddb_trigger_function. Essas funções criarão uma tabela do DynamoDB com a chave de partição PK e a chave de classificação SK no modo de provisionamento sob demanda, com o Amazon DynamoDB Streams habilitado por padrão para mostrar imagens novas e antigas.

A função do Lambda será armazenada na pasta lambda abaixo do arquivo app.py. Esse arquivo será criado posteriormente. Ele incluirá uma variável de ambiente APP_TABLE_NAME, que será o nome da tabela do Amazon DynamoDB criada por essa pilha. Na mesma função, concederemos permissões de leitura de fluxo para a função do Lambda. Por fim, ele se inscreverá no DynamoDB Streams como fonte de eventos para a função do Lambda.

No final do arquivo no método __init__, você chamará as respectivas estruturas para inicializá-las na pilha. Para projetos maiores que exigem componentes e serviços adicionais, talvez seja melhor definir essas estruturas fora da pilha base.

import os import json import aws_cdk as cdk from aws_cdk import ( Stack, aws_lambda as _lambda, aws_dynamodb as dynamodb, ) from constructs import Construct class DdbFiltersStack(Stack): def _create_ddb_table(self): dynamodb_table = dynamodb.Table( self, "AppTable", partition_key=dynamodb.Attribute( name="PK", type=dynamodb.AttributeType.STRING ), sort_key=dynamodb.Attribute( name="SK", type=dynamodb.AttributeType.STRING), billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST, stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, removal_policy=cdk.RemovalPolicy.DESTROY, ) cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name) return dynamodb_table def _set_ddb_trigger_function(self, ddb_table): events_lambda = _lambda.Function( self, "LambdaHandler", runtime=_lambda.Runtime.PYTHON_3_9, code=_lambda.Code.from_asset("lambda"), handler="app.handler", environment={ "APP_TABLE_NAME": ddb_table.table_name, }, ) ddb_table.grant_stream_read(events_lambda) event_subscription = _lambda.CfnEventSourceMapping( scope=self, id="companyInsertsOnlyEventSourceMapping", function_name=events_lambda.function_name, event_source_arn=ddb_table.table_stream_arn, maximum_batching_window_in_seconds=1, starting_position="LATEST", batch_size=1, ) def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: super().__init__(scope, construct_id, **kwargs) ddb_table = self._create_ddb_table() self._set_ddb_trigger_function(ddb_table)

Agora, criaremos uma função do Lambda muito simples que imprimirá os logs no Amazon CloudWatch. Para fazer isso, crie uma pasta chamada lambda.

mkdir lambda touch app.py

Usando o editor de texto de sua preferência, adicione o seguinte conteúdo ao arquivo app.py:

import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec)

Garantindo que você esteja na pasta /ddb_filters/, digite o seguinte comando para criar a aplicação de exemplo:

cdk deploy

Em algum momento, você deverá confirmar se deseja implantar a solução. Aceite as alterações digitando Y.

├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤ │ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │ └───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘ Do you wish to deploy these changes (y/n)? y ... ✨ Deployment time: 67.73s Outputs: DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP Stack ARN: arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6

Depois que as alterações forem implantadas, abra o console da AWS e adicione um item à tabela.

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }

Os logs do CloudWatch agora devem conter todas as informações dessa entrada.

Exemplos de filtragem

  • Somente produtos que correspondam a um determinado estado

Abra o arquivo ddb_filters/ddb_filters/ddb_filters_stack.py e modifique-o para incluir o filtro que corresponde a todos os produtos que são iguais a “FL”. Isso pode ser revisado logo abaixo de event_subscription na linha 45.

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )
  • Somente os itens que começam com alguns valores em PK e SK

Modifique o script Python para incluir a seguinte condição:

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, ] },
  • Comece com alguns valores em PK e SK ou de determinado estado.

Modifique o script Python para incluir as seguintes condições:

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )

Observe que a condição OR é adicionada incluindo mais elementos à matriz Filters (Filtros).

Limpeza

Localize a pilha de filtros na base do diretório de trabalho e execute cdk destroy. Confirme a exclusão do recurso:

cdk destroy Are you sure you want to delete: DdbFiltersStack (y/n)? y