

# Usar o AWS Lambda com o Amazon DynamoDB
<a name="with-ddb"></a>

**nota**  
Se você deseja enviar dados para um destino que não seja uma função do Lambda ou enriquecer os dados antes de enviá-los, consulte [Amazon EventBridge Pipes](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html) (Pipes do Amazon EventBridge).

É possível usar uma função do AWS Lambda para processar registros em um [Amazon DynamoDB Stream](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html). Com o DynamoDB Streams, você pode acionar uma função do Lambda para executar o trabalho adicional cada vez que uma tabela do DynamoDB é atualizada.

Ao processar fluxos do DynamoDB, é necessário implementar uma lógica de resposta em lote parcial para evitar que registros processados com sucesso sejam repetidos quando alguns registros em um lote falham. O [utilitário Processador em Lote](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/) do Powertools para AWS Lambda está disponível em Python, TypeScript, .NET e Java e simplifica essa implementação ao lidar automaticamente com a lógica de resposta em lote parcial, reduzindo o tempo de desenvolvimento e melhorando a confiabilidade.

**Topics**
+ [Fluxos de sondagem e agrupamento em lotes](#dynamodb-polling-and-batching)
+ [Posições iniciais de sondagem e fluxo](#dyanmo-db-stream-poll)
+ [Leitores simultâneos de um fragmento no DynamoDB Streams](#events-dynamodb-simultaneous-readers)
+ [Evento de exemplo](#events-sample-dynamodb)
+ [Processar registros do DynamoDB com o Lambda](services-dynamodb-eventsourcemapping.md)
+ [Configurar resposta em lote parcial com o DynamoDB e o Lambda](services-ddb-batchfailurereporting.md)
+ [Reter registros descartados para uma origem de eventos do DynamoDB no Lambda](services-dynamodb-errors.md)
+ [Implementar o processamento com estado do DynamoDB no Lambda](services-ddb-windows.md)
+ [Parâmetros do Lambda para os mapeamentos das origens de eventos do Amazon DynamoDB](services-ddb-params.md)
+ [Usar a filtragem de eventos com uma origem de eventos do DynamoDB](with-ddb-filtering.md)
+ [Tutorial: Usar o AWS Lambda com o Amazon DynamoDB Streams](with-ddb-example.md)

## Fluxos de sondagem e agrupamento em lotes
<a name="dynamodb-polling-and-batching"></a>

O Lambda sonda os fragmentos em sua transmissão do DynamoDB em busca de registros a uma taxa básica de 4 vezes por segundo. Quando os registros estão disponíveis, o Lambda invoca a função e aguarda o resultado. Se o processamento for bem-sucedido, o Lambda continua a sondagem até que ela receba mais registros.

Por padrão, o Lambda invoca a função assim que os registros estão disponíveis. Se o lote que o Lambda lê da fonte de eventos tiver apenas um registro, o Lambda enviará apenas um registro à função. Para evitar a invocação da função com poucos registros, instrua a fonte de eventos para armazenar os registros em buffer por até cinco minutos, configurando uma *janela de lotes*. Antes de invocar a função, o Lambda continua a ler registros da fonte de eventos até coletar um lote inteiro, até que a janela de lote expire ou até que o lote atinja o limite de carga útil de 6 MB. Para obter mais informações, consulte [Comportamento de lotes](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching).

**Atenção**  
Os mapeamentos da origem do evento do Lambda processam cada evento ao menos uma vez, podendo haver o processamento duplicado de registros. Para evitar possíveis problemas relacionados a eventos duplicados, é altamente recomendável tornar o código da função idempotente. Para saber mais, consulte [Como tornar minha função do Lambda idempotente](https://repost.aws/knowledge-center/lambda-function-idempotent) no Centro de Conhecimentos da AWS.

O Lambda não espera a conclusão de nenhuma [extensão](lambda-extensions.md)configurada para enviar o próximo lote para processamento. Em outras palavras, suas extensões podem continuar sendo executadas enquanto o Lambda processa o próximo lote de registros. Isso pode causar problemas de controle de utilização se você violar quaisquer configurações ou limites de [simultaneidade](lambda-concurrency.md) de sua conta. Para detectar se esse é um problema em potencial, monitore suas funções e verifique se você está vendo [métricas de simultaneidade](monitoring-concurrency.md#general-concurrency-metrics) mais altas do que o esperado para o seu mapeamento da origem do evento. Devido ao curto intervalo entre as invocações, o Lambda pode relatar brevemente um uso de simultaneidade maior do que o número de fragmentos. Isso pode ser verdadeiro até mesmo para funções do Lambda sem extensões.

Defina a configuração [ParallelizationFactor](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-ParallelizationFactor) para processar um fragmento de um fluxo de dados do DynamoDB com mais de uma invocação do Lambda simultaneamente. Você pode especificar o número de lotes simultâneos que o Lambda pesquisa de um fragmento por meio de um fator de paralelização de 1 (padrão) a 10. Por exemplo, quando você define `ParallelizationFactor` como 2, pode ter até 200 invocações simultâneas do Lambda para processar 100 fragmentos de fluxos do DynamoDB (embora, na prática, você possa ver valores diferentes para a métrica `ConcurrentExecutions`). Isso ajuda a aumentar a escala do throughput de processamento quando o volume de dados é volátil e o valor de [IteratorAge](monitoring-metrics-types.md#performance-metrics) é alto. Quando você aumentar o número de lotes simultâneos por fragmento, o Lambda ainda garantirá o processamento por ordem no nível de item (partição e chave de classificação).

## Posições iniciais de sondagem e fluxo
<a name="dyanmo-db-stream-poll"></a>

Esteja ciente de que a sondagem do fluxo durante a criação e as atualizações do mapeamento da origem do evento é, finalmente, consistente.
+ Durante a criação do mapeamento da origem do evento, pode levar alguns minutos para a sondagem de eventos do fluxo iniciar.
+ Durante as atualizações do mapeamento da origem do evento, pode levar alguns minutos para interromper e reiniciar a sondagem de eventos do fluxo.

Esse comportamento significa que, se você especificar `LATEST` como posição inicial do fluxo, o mapeamento da origem do evento pode perder eventos durante a criação ou as atualizações. Para garantir que nenhum evento seja perdido, especifique a posição inicial do fluxo como `TRIM_HORIZON`.

## Leitores simultâneos de um fragmento no DynamoDB Streams
<a name="events-dynamodb-simultaneous-readers"></a>

Para tabelas de região única que não são tabelas globais, você pode projetar até duas funções do Lambda para ler o mesmo fragmento do DynamoDB Streams ao mesmo tempo. Exceder esse limite pode resultar em controle de utilização de solicitação. Para tabelas globais, recomendamos que você limite o número de leitores simultâneos para 1 para evitar o controle de utilização de solicitações.

## Evento de exemplo
<a name="events-sample-dynamodb"></a>

**Example**  

```
{
  "Records": [
    {
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "NewImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES",
        "SequenceNumber": "111",
        "SizeBytes": 26
      },
      "awsRegion": "us-west-2",
      "eventName": "INSERT",
      "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525",
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "2",
      "eventVersion": "1.0",
      "dynamodb": {
        "OldImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "SequenceNumber": "222",
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "SizeBytes": 59,
        "NewImage": {
          "Message": {
            "S": "This item has changed"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "awsRegion": "us-west-2",
      "eventName": "MODIFY",
      "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525",
      "eventSource": "aws:dynamodb"
    }
  ]}
```

# Processar registros do DynamoDB com o Lambda
<a name="services-dynamodb-eventsourcemapping"></a>

Crie um mapeamento de fontes de eventos para orientar o Lambda a enviar registros de sua transmissão para uma função do Lambda. É possível criar vários mapeamentos de origem de evento para processar os mesmos dados com várias funções do Lambda ou processar itens de vários fluxos com uma única função.

Você pode configurar mapeamentos da origem do evento para processar registros de um fluxo em outra Conta da AWS. Para saber mais, consulte [Como criar mapeamentos da origem do evento entre contas](#services-dynamodb-eventsourcemapping-cross-account).

Para configurar sua função para ler do DynamoDB Streams, anexe a política [AWSLambdaDynamoDBExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaDynamoDBExecutionRole.html) gerenciada pela AWS ao seu perfil de execução e, em seguida, crie um gatilho **DynamoDB**.

**Para adicionar permissões e criar um acionador**

1. Abra a [página Funções](https://console.aws.amazon.com/lambda/home#/functions) do console do Lambda.

1. Escolha o nome de uma função.

1. Escolha a guia **Configuration** (Configuração) e, depois, **Permissions** (Permissões).

1. Em **Nome do perfil**, escolha o link para seu perfil de execução. Esse link abre o perfil no console do IAM.  
![\[\]](http://docs.aws.amazon.com/pt_br/lambda/latest/dg/images/execution-role.png)

1. Escolha **Adicionar permissões** e depois **Anexar políticas**.  
![\[\]](http://docs.aws.amazon.com/pt_br/lambda/latest/dg/images/attach-policies.png)

1. No campo de pesquisa, digite `AWSLambdaDynamoDBExecutionRole`. Adicione esta política ao seu perfil de execução Essa é uma política gerenciada pela AWS que contém as permissões de que a função precisa para ler um fluxo do DynamoDB. Para obter mais informações sobre essa política, consulte [AWSLambdaDynamoDBExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaDynamoDBExecutionRole.html) na *Referência de política gerenciada da AWS*.

1. Volte para a sua função no console do Lambda Em **Visão geral da função**, escolha **Adicionar gatilho**.  
![\[\]](http://docs.aws.amazon.com/pt_br/lambda/latest/dg/images/add-trigger.png)

1. Escolha um tipo de acionador.

1. Configure as opções necessárias e escolha **Add** (Adicionar).

O Lambda é compatível com as seguintes opções de origens de eventos do DynamoDB:

**Opções de fonte do evento**
+ **DynamoDB table** (Tabela do DynamoDB): a tabela do DynamoDB da qual os registros serão lidos.
+ **Batch size** (Tamanho do lote): o número de registros a serem enviados para a função em cada lote, até 10.000. O Lambda transmite todos os registros no batch para a função em uma única chamada, enquanto o tamanho total dos eventos não exceder o [limite de carga útil](gettingstarted-limits.md) para invocação síncrona (6 MB).
+ **Batch window** (Janela de lote): especifique o máximo de tempo para reunir registros antes de invocar a função, em segundos.
+ **Starting position** (Posição inicial): processe apenas registros novos ou todos os registros existentes.
  + **Latest** (Mais recente): processe novos registros adicionados ao fluxo.
  + **Trim horizon** (Redução horizontal): processe todos os registros na transmissão.

  Depois de processar todos os registros existentes, a função é capturada e continua a processar novos registros.
+ **Destino na falha**: uma fila padrão do SQS ou um tópico padrão do SNS para registros que não podem ser processados. Quando o Lambda descarta um lote de registros que é muito antigo ou esgotou todas as tentativas, ele envia detalhes sobre o lote à fila ou ao tópico.
+ **Retry attempts** (Tentativas de repetição): o número máximo de vezes que o Lambda tenta novamente quando a função retorna um erro. Isso não se aplica a erros de serviço ou controles em que o lote não atingiu a função.
+ **Idade máxima do registro**: a idade máxima de um registro que o Lambda envia para sua função.
+ **Dividir lote por erro**: quando a função retornar um erro, o lote é dividido em dois antes de uma nova tentativa. A configuração do tamanho do lote original permanece inalterada.
+ **Concurrent batches per shard** (Lotes simultâneos por fragmento): processa simultaneamente vários lotes do mesmo fragmento.
+ **Enabled** (Habilitado): defina como verdadeiro para habilitar o mapeamento de fontes de eventos. Defina como falso para interromper o processamento de registros. O Lambda monitora o último registro processado e retoma o processamento a partir desse ponto quando o mapeamento é habilitado novamente.

**nota**  
Você não é cobrado por chamadas da API GetRecords invocadas pelo Lambda como parte de acionadores do DynamoDB.

Para gerenciar a configuração da fonte do evento posteriormente, escolha o gatilho no designer.

## Como criar mapeamentos da origem do evento entre contas
<a name="services-dynamodb-eventsourcemapping-cross-account"></a>

O Amazon DynamoDB agora é compatível com [políticas baseadas em recursos](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/access-control-resource-based.html). Com esse recurso, você pode processar dados de um fluxo do DynamoDB em uma Conta da AWS com uma função do Lambda em outra conta.

Para criar um mapeamento da origem do evento para sua função do Lambda usando um fluxo do DynamoDB em outra Conta da AWS, você deve configurar o fluxo usando uma política baseada em recursos para dar permissão à função do Lambda para ler registros. Para saber como configurar o fluxo para permitir acesso entre contas, consulte [Compartilhar acesso com funções do Lambda entre contas](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/rbac-cross-account-access.html#rbac-analyze-cross-account-lambda-access) no *Guia do desenvolvedor do Amazon DynamoDB*.

Depois de configurar o fluxo com uma política baseada em recursos que conceda à função do Lambda as permissões necessárias, crie o mapeamento da origem do evento com seu ARN do fluxo entre contas. Você pode encontrar o ARN do fluxo na guia **Exportações e fluxos** da tabela no console do DynamoDB entre contas. 

Ao usar o console do Lambda, cole o ARN do fluxo diretamente no campo de entrada da tabela do DynamoDB na página de criação do mapeamento da origem do evento.

 **Observação:** não há suporte para gatilhos entre regiões. 

# Configurar resposta em lote parcial com o DynamoDB e o Lambda
<a name="services-ddb-batchfailurereporting"></a>

Ao consumir e processar dados de transmissão de uma fonte de eventos, o Lambda definirá checkpoints por padrão no número mais elevado na sequência de um lote somente quando o lote for um sucesso total. O Lambda trata todos os outros resultados como uma falha completa e tenta processar novamente o lote até o limite de novas tentativas. Para permitir sucessos parciais durante o processamento de lotes de um stream, ative `ReportBatchItemFailures`. Permitir sucessos parciais pode ajudar a reduzir o número de novas tentativas em um registro, embora não impeça totalmente a possibilidade de novas tentativas em um registro bem-sucedido.

Para ativar `ReportBatchItemFailures`, inclua o valor de enum **ReportBatchItemFailures** na lista [FunctionResponseTypes](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionResponseTypes). Essa lista indica quais tipos de resposta estão habilitados para sua função. Você pode configurar essa lista ao [criar](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) ou [atualizar](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) um mapeamento de origem de eventos.

**nota**  
Mesmo quando seu código de função retorna respostas parciais de falha em lote, essas respostas não serão processadas pelo Lambda, a menos que o atributo `ReportBatchItemFailures` esteja explicitamente ativado para o mapeamento da origem do evento.

## Sintaxe do relatório
<a name="streams-batchfailurereporting-syntax"></a>

Ao configurar relatórios sobre falhas de itens de lote, a classe `StreamsEventResponse` é retornada com uma lista de falhas de itens de lote. É possível usar um objeto `StreamsEventResponse` para retornar o número sequencial do primeiro registro com falha no lote. Você também pode criar sua própria classe personalizada usando a sintaxe de resposta correta. A seguinte estrutura JSON mostra a sintaxe de resposta necessária:

```
{ 
  "batchItemFailures": [ 
        {
            "itemIdentifier": "<SequenceNumber>"
        }
    ]
}
```

**nota**  
Se a matriz `batchItemFailures` contém vários itens, o Lambda usa o registro com o menor número de sequência como ponto de verificação. Em seguida, o Lambda repete todos os registros a partir desse ponto de verificação.

## Condições de sucesso e falha
<a name="streams-batchfailurereporting-conditions"></a>

O Lambda trata um lote como um sucesso completo se você retornar qualquer um destes:
+ Uma lista de `batchItemFailure` vazia
+ Uma lista de `batchItemFailure` nula
+ Uma vazia `EventResponse`
+ Uma nula `EventResponse`

O Lambda trata um lote como uma falha absoluta se você retornar qualquer um dos seguintes:
+ Uma string vazia `itemIdentifier`
+ Uma nula `itemIdentifier`
+ Um `itemIdentifier` com um nome de chave inválido

O Lambda faz novas tentativas após falhas com base na sua estratégia de repetição.

## Dividir um lote
<a name="streams-batchfailurereporting-bisect"></a>

Se a invocação falhar e `BisectBatchOnFunctionError` estiver ativado, o lote será dividido independentemente da configuração de `ReportBatchItemFailures`.

Quando uma resposta de sucesso parcial do lote é recebida e tanto `BisectBatchOnFunctionError` quanto `ReportBatchItemFailures` estão ativados, o lote é dividido no número de sequência retornado e o Lambda tenta novamente apenas os registros restantes.

Para simplificar a implementação da lógica de respostas parciais em lote, considere usar o [utilitário de processador em lote](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/) da Powertools para AWS Lambda, que lida de maneira automática com essas complexidades para você.

Veja alguns exemplos de código de função que retornam a lista de IDs de mensagens com falha no lote:

------
#### [ .NET ]

**SDK para .NET**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Como relatar falhas de itens em lote do DynamoDB com o Lambda usando .NET.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace AWSLambda_DDB;

public class Function
{
    public StreamsEventResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)

    {
        context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");
        List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new List<StreamsEventResponse.BatchItemFailure>();
        StreamsEventResponse streamsEventResponse = new StreamsEventResponse();

        foreach (var record in dynamoEvent.Records)
        {
            try
            {
                var sequenceNumber = record.Dynamodb.SequenceNumber;
                context.Logger.LogInformation(sequenceNumber);
            }
            catch (Exception ex)
            {
                context.Logger.LogError(ex.Message);
                batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = record.Dynamodb.SequenceNumber });
            }
        }

        if (batchItemFailures.Count > 0)
        {
            streamsEventResponse.BatchItemFailures = batchItemFailures;
        }

        context.Logger.LogInformation("Stream processing complete.");
        return streamsEventResponse;
    }
}
```

------
#### [ Go ]

**SDK para Go V2**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Como relatar falhas de itens em lote do DynamoDB com o Lambda usando Go.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

type BatchItemFailure struct {
	ItemIdentifier string `json:"ItemIdentifier"`
}

type BatchResult struct {
	BatchItemFailures []BatchItemFailure `json:"BatchItemFailures"`
}

func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*BatchResult, error) {
	var batchItemFailures []BatchItemFailure
	curRecordSequenceNumber := ""

	for _, record := range event.Records {
		// Process your record
		curRecordSequenceNumber = record.Change.SequenceNumber
	}

	if curRecordSequenceNumber != "" {
		batchItemFailures = append(batchItemFailures, BatchItemFailure{ItemIdentifier: curRecordSequenceNumber})
	}
	
	batchResult := BatchResult{
		BatchItemFailures: batchItemFailures,
	}

	return &batchResult, nil
}

func main() {
	lambda.Start(HandleRequest)
}
```

------
#### [ Java ]

**SDK para Java 2.x**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Como relatar falhas de itens em lote do DynamoDB com o Lambda usando Java.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord;

import java.util.ArrayList;
import java.util.List;

public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, StreamsEventResponse> {

    @Override
    public StreamsEventResponse handleRequest(DynamodbEvent input, Context context) {

        List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
        String curRecordSequenceNumber = "";

        for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) {
          try {
                //Process your record
                StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb();
                curRecordSequenceNumber = dynamodbRecord.getSequenceNumber();
                
            } catch (Exception e) {
                /* Since we are working with streams, we can return the failed item immediately.
                   Lambda will immediately begin to retry processing from this failed item onwards. */
                batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
                return new StreamsEventResponse(batchItemFailures);
            }
        }
       
       return new StreamsEventResponse();   
    }
}
```

------
#### [ JavaScript ]

**SDK para JavaScript (v3)**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Como relatar falhas de itens em lote do DynamoDB com o Lambda usando JavaScript.  

```
export const handler = async (event) => {
  const records = event.Records;
  let curRecordSequenceNumber = "";

  for (const record of records) {
    try {
      // Process your record
      curRecordSequenceNumber = record.dynamodb.SequenceNumber;
    } catch (e) {
      // Return failed record's sequence number
      return { batchItemFailures: [{ itemIdentifier: curRecordSequenceNumber }] };
    }
  }

  return { batchItemFailures: [] };
};
```
Como relatar falhas de itens em lote do DynamoDB com o Lambda usando TypeScript.  

```
import {
  DynamoDBBatchResponse,
  DynamoDBBatchItemFailure,
  DynamoDBStreamEvent,
} from "aws-lambda";

export const handler = async (
  event: DynamoDBStreamEvent
): Promise<DynamoDBBatchResponse> => {
  const batchItemFailures: DynamoDBBatchItemFailure[] = [];
  let curRecordSequenceNumber;

  for (const record of event.Records) {
    curRecordSequenceNumber = record.dynamodb?.SequenceNumber;

    if (curRecordSequenceNumber) {
      batchItemFailures.push({
        itemIdentifier: curRecordSequenceNumber,
      });
    }
  }

  return { batchItemFailures: batchItemFailures };
};
```

------
#### [ PHP ]

**SDK para PHP**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Relatar falhas de itens em lote do DynamoDB com o Lambda usando PHP.  

```
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler implements StdHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handle(mixed $event, Context $context): array
    {
        $dynamoDbEvent = new DynamoDbEvent($event);
        $this->logger->info("Processing records");

        $records = $dynamoDbEvent->getRecords();
        $failedRecords = [];
        foreach ($records as $record) {
            try {
                $data = $record->getData();
                $this->logger->info(json_encode($data));
                // TODO: Do interesting work based on the new data
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
                // failed processing the record
                $failedRecords[] = $record->getSequenceNumber();
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");

        // change format for the response
        $failures = array_map(
            fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber],
            $failedRecords
        );

        return [
            'batchItemFailures' => $failures
        ];
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK para Python (Boto3).**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Como relatar falhas de itens em lote do DynamoDB com o Lambda usando Python.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def handler(event, context):
    records = event.get("Records")
    curRecordSequenceNumber = ""
    
    for record in records:
        try:
            # Process your record
            curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"]
        except Exception as e:
            # Return failed record's sequence number
            return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]}

    return {"batchItemFailures":[]}
```

------
#### [ Ruby ]

**SDK para Ruby**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Como relatar falhas de itens em lote do DynamoDB com o Lambda usando Ruby.  

```
def lambda_handler(event:, context:)
    records = event["Records"]
    cur_record_sequence_number = ""
  
    records.each do |record|
      begin
        # Process your record
        cur_record_sequence_number = record["dynamodb"]["SequenceNumber"]
      rescue StandardError => e
        # Return failed record's sequence number
        return {"batchItemFailures" => [{"itemIdentifier" => cur_record_sequence_number}]}
      end
    end
  
    {"batchItemFailures" => []}
  end
```

------
#### [ Rust ]

**SDK para Rust**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Como relatar falhas de itens em lote do DynamoDB com o Lambda usando Rust.  

```
use aws_lambda_events::{
    event::dynamodb::{Event, EventRecord, StreamRecord},
    streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

/// Process the stream record
fn process_record(record: &EventRecord) -> Result<(), Error> {
    let stream_record: &StreamRecord = &record.change;

    // process your stream record here...
    tracing::info!("Data: {:?}", stream_record);

    Ok(())
}

/// Main Lambda handler here...
async fn function_handler(event: LambdaEvent<Event>) -> Result<DynamoDbEventResponse, Error> {
    let mut response = DynamoDbEventResponse {
        batch_item_failures: vec![],
    };

    let records = &event.payload.records;

    if records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(response);
    }

    for record in records {
        tracing::info!("EventId: {}", record.event_id);

        // Couldn't find a sequence number
        if record.change.sequence_number.is_none() {
            response.batch_item_failures.push(DynamoDbBatchItemFailure {
                item_identifier: Some("".to_string()),
            });
            return Ok(response);
        }

        // Process your record here...
        if process_record(record).is_err() {
            response.batch_item_failures.push(DynamoDbBatchItemFailure {
                item_identifier: record.change.sequence_number.clone(),
            });
            /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
            return Ok(response);
        }
    }

    tracing::info!("Successfully processed {} record(s)", records.len());

    Ok(response)
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

## Usar o Powertools para o processador em lote AWS Lambda
<a name="services-ddb-batchfailurereporting-powertools"></a>

O utilitário de processador em lote do Powertools para AWS Lambda lida de maneira automática com a lógica de respostas parciais em lote, reduzindo a complexidade da implementação de relatórios de falhas em lote. Veja a seguir alguns exemplos usando o processador em lote:

**Python**  
Para ver exemplos completos e instruções de configuração, consulte a [documentação do processador em lote](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/).
Processamento de registros de fluxo do DynamoDB com processador em lote AWS Lambda.  

```
import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes import DynamoDBStreamEvent
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
logger = Logger()

def record_handler(record):
    logger.info(record)
    # Your business logic here
    # Raise an exception to mark this record as failed
    
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(
        event=event, 
        record_handler=record_handler, 
        processor=processor,
        context=context
    )
```

**TypeScript**  
Para ver exemplos completos e instruções de configuração, consulte a [documentação do processador em lote](https://docs.aws.amazon.com/powertools/typescript/latest/features/batch/).
Processamento de registros de fluxo do DynamoDB com processador em lote AWS Lambda.  

```
import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { DynamoDBStreamEvent, Context } from 'aws-lambda';

const processor = new BatchProcessor(EventType.DynamoDBStreams);
const logger = new Logger();

const recordHandler = async (record: any): Promise<void> => {
    logger.info('Processing record', { record });
    // Your business logic here
    // Throw an error to mark this record as failed
};

export const handler = async (event: DynamoDBStreamEvent, context: Context) => {
    return processPartialResponse(event, recordHandler, processor, {
        context,
    });
};
```

**Java**  
Para ver exemplos completos e instruções de configuração, consulte a [documentação do processador em lote](https://docs.powertools.aws.dev/lambda/java/latest/utilities/batch/).
Processamento de registros de fluxo do DynamoDB com processador em lote AWS Lambda.  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;

public class DynamoDBStreamBatchHandler implements RequestHandler<DynamodbEvent, StreamsEventResponse> {

    private final BatchMessageHandler<DynamodbEvent, StreamsEventResponse> handler;

    public DynamoDBStreamBatchHandler() {
        handler = new BatchMessageHandlerBuilder()
                .withDynamoDbBatchHandler()
                .buildWithRawMessageHandler(this::processMessage);
    }

    @Override
    public StreamsEventResponse handleRequest(DynamodbEvent ddbEvent, Context context) {
        return handler.processBatch(ddbEvent, context);
    }

    private void processMessage(DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord, Context context) {
        // Process the change record
    }
}
```

**.NET**  
Para ver exemplos completos e instruções de configuração, consulte a [documentação do processador em lote](https://docs.aws.amazon.com/powertools/dotnet/utilities/batch-processing/).
Processamento de registros de fluxo do DynamoDB com processador em lote AWS Lambda.  

```
using System;
using System.Threading;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
using Amazon.Lambda.Serialization.SystemTextJson;
using AWS.Lambda.Powertools.BatchProcessing;

[assembly: LambdaSerializer(typeof(DefaultLambdaJsonSerializer))]

namespace HelloWorld;

public class Customer
{
    public string? CustomerId { get; set; }
    public string? Name { get; set; }
    public string? Email { get; set; }
    public DateTime CreatedAt { get; set; }
}

internal class TypedDynamoDbRecordHandler : ITypedRecordHandler<Customer> 
{
    public async Task<RecordHandlerResult> HandleAsync(Customer customer, CancellationToken cancellationToken)
    {
        if (string.IsNullOrEmpty(customer.Email)) 
        {
            throw new ArgumentException("Customer email is required");
        }

        return await Task.FromResult(RecordHandlerResult.None); 
    }
}

public class Function
{
    [BatchProcessor(TypedRecordHandler = typeof(TypedDynamoDbRecordHandler))]
    public BatchItemFailuresResponse HandlerUsingTypedAttribute(DynamoDBEvent _)
    {
        return TypedDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; 
    }
}
```

# Reter registros descartados para uma origem de eventos do DynamoDB no Lambda
<a name="services-dynamodb-errors"></a>

O tratamento de erros para mapeamentos de origem de eventos do DynamoDB depende se o erro ocorre antes de a função ser invocada ou durante a invocação da função:
+ **Antes da invocação:** se um mapeamento de origem de eventos do Lambda não conseguir invocar a função devido a limitações ou a outros problemas, ele tentará novamente até que os registros expirem ou excedam a idade máxima configurada no mapeamento de origem de eventos ([MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)).
+ **Durante a invocação:** se a função for invocada, mas retornar um erro, o Lambda tentará novamente até que os registros expirem, excedam a idade máxima ([MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)) ou atinjam a cota de repetição configurada ([MaximumRetryAttempts](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRetryAttempts)). Para erros de função, também é possível pode configurar [BisectBatchOnFunctionError](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-response-BisectBatchOnFunctionError), que divide um lote com falha em dois lotes em lotes, isolando registros com problema e evitando exceder tempos limites. A divisão de lotes não consome a cota de repetição.

Se as medidas de tratamento de erros falharem, o Lambda descartará os registros e continuará processando lotes provenientes da transmissão. Com as configurações padrão, isso significa que um registro inválido pode bloquear o processamento no fragmento afetado por até um dia. Para evitar isso, configure o mapeamento de fontes de eventos da sua função com um número razoável de tentativas e uma idade máxima de registro que se adapte ao seu caso de uso.

## Configurar destinos para invocações com falha
<a name="dynamodb-on-failure-destination-console"></a>

Para reter registros de invocações de mapeamento da origem do evento com falha, adicione um destino ao mapeamento da origem de eventos da função. Cada registro enviado ao destino é um documento JSON que contém metadados sobre a invocação que falhou. Para destinos do Amazon S3, o Lambda também envia todo o registro da invocação junto com os metadados. É possível configurar qualquer tópico do Amazon SNS, fila do Amazon SQS, bucket do Amazon S3 ou Kafka como destino.

Com destinos do Amazon S3, você pode usar o recurso [Notificações de eventos do Amazon S3](https://docs.aws.amazon.com/) para receber notificações quando objetos forem carregados no bucket do S3 de destino. Também é possível configurar as notificações de eventos do S3 para invocar outra função do Lambda para realizar o processamento automatizado em lotes com falha.

Sua função de execução deve ter permissões para o destino:
+ **Para um destino do SQS:** [sqs:SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)
+ **Para um destino do SNS:** [sns:Publish](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html)
+ **Para um destino do S3:** [s3:PutObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html) e [s3:ListBucket](https://docs.aws.amazon.com/AmazonS3/latest/API/ListObjectsV2.html)
+ **Para um destino do Kafka**: [kafka-cluster:WriteData](https://docs.aws.amazon.com/msk/latest/developerguide/kafka-actions.html)

É possível configurar um tópico do Kafka como um destino do Kafka em caso de falha para os seus mapeamentos da origem do evento do Kafka. Quando o Lambda não consegue processar registros após exaurir as novas tentativas ou quando os registros excedem a idade máxima, o Lambda envia os registros com falha para o tópico especificado do Kafka para processamento posterior. Consulte [Uso de um tópico do Kafka como destino em caso de falha](kafka-on-failure-destination.md).

Se você habilitou a criptografia com sua própria chave do KMS para um destino do S3, o perfil de execução da função também deve ter permissão para chamar [kms:GenerateDataKey](https://docs.aws.amazon.com/kms/latest/APIReference/API_GenerateDataKey.html). Se a chave do KMS e o destino do bucket do S3 estiverem em uma conta diferente da função do Lambda e do perfil de execução, configure a chave do KMS para confiar no perfil de execução para permitir kms:GenerateDataKey.

Para configurar um destino em caso de falha usando o console, siga estas etapas:

1. Abra a [página Funções](https://console.aws.amazon.com/lambda/home#/functions) do console do Lambda.

1. Escolha uma função.

1. Em **Function overview (Visão geral da função)**, escolha **Add destination (Adicionar destino)**.

1. Em **Origem**, escolha **Invocação do mapeamento da origem do evento**.

1. Em **Mapeamento da origem do evento**, escolha uma origem de eventos configurada para essa função.

1. Em **Condição**, selecione **Em caso de falha**. Para invocações de mapeamento da origem de eventos, essa é a única condição aceita.

1. Em **Tipo de destino**, escolha o tipo de destino para o qual o Lambda envia registros de invocação.

1. Em **Destination (Destino)**, escolha um recurso.

1. Escolha **Salvar**.

Também é possível configurar um destino em caso de falha usando a AWS Command Line Interface (AWS CLI). Por exemplo, o seguinte comando [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) adiciona um mapeamento de origem de eventos com um destino SQS em caso de falha a `MyFunction`:

```
aws lambda create-event-source-mapping \
--function-name "MyFunction" \
--event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
```

O comando [update-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/update-event-source-mapping.html) a seguir atualiza um mapeamento de origem de eventos para enviar registros de chamada com falha para um destino SNS após duas novas tentativas ou se os registros tiverem mais de uma hora.

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--maximum-retry-attempts 2 \
--maximum-record-age-in-seconds 3600 \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'
```

As configurações atualizadas são aplicadas de forma assíncrona e não são refletidas na saída até que o processo seja concluído. Use o comando [get-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/get-event-source-mapping.html) para visualizar o status atual.

Para remover um destino, forneça uma string vazia como argumento para o parâmetro `destination-config`:

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--destination-config '{"OnFailure": {"Destination": ""}}'
```

### Práticas recomendadas de segurança para destinos do Amazon S3
<a name="ddb-s3-destination-security"></a>

Excluir um bucket do S3 configurado como destino sem remover o destino da configuração da sua função pode criar um risco de segurança. Se outro usuário souber o nome do seu bucket de destino, ele poderá recriar o bucket na Conta da AWS dele. Registros de invocações com falha serão enviados para o bucket do usuário, potencialmente expondo dados da sua função.

**Atenção**  
Para garantir que os registros de invocação da sua função não possam ser enviados para um bucket do S3 em outra Conta da AWS, adicione uma condição ao perfil de execução da função que limite as permissões `s3:PutObject` aos buckets na sua conta. 

O exemplo a seguir mostra uma política do IAM que limita as permissões `s3:PutObject` da função aos bucket da conta. Essa política também dá ao Lambda a permissão `s3:ListBucket` necessária para usar um bucket do S3 como destino.

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3BucketResourceAccountWrite",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::*/*",
                "arn:aws:s3:::*"
            ],
            "Condition": {
                "StringEquals": {
                    "s3:ResourceAccount": "111122223333"
                }
            }
        }
    ]
}
```

Para adicionar uma política de permissões ao perfil de execução da função usando o Console de gerenciamento da AWS ou a AWS CLI, consulte as instruções nos seguintes procedimentos:

------
#### [ Console ]

**Para adicionar uma política de permissões ao perfil de execução de uma função (console)**

1. Abra a [página Funções](https://console.aws.amazon.com/lambda/home#/functions) do console do Lambda.

1. Selecione a função do Lambda cujo perfil de execução você queira modificar.

1. Na guia **Configuração**, escolha **Permissões**.

1. Na guia **Perfil de execução**, selecione o **Nome do perfil** da função para abrir a página do console do IAM do perfil.

1. Adicione uma política de permissões ao perfil da seguinte maneira:

   1. No painel **Políticas de permissões**, escolha **Adicionar permissões** e **Criar política em linha**.

   1. No **Editor de políticas**, selecione **JSON**.

   1. Cole a política que você deseja adicionar no editor (substituindo o JSON existente) e escolha **Próximo**.

   1. No campo **Detalhes da política**, insira o **Nome da política**.

   1. Escolha **Criar política**.

------
#### [ AWS CLI ]

**Para adicionar uma política de permissões ao perfil de execução de uma função (CLI)**

1. Crie um documento de política de JSON com as permissões necessárias e salve-o em um diretório local.

1. Use o comando da CLI `put-role-policy` do IAM para adicionar permissões ao perfil de execução da função. Execute o comando a seguir no diretório em que você salvou seu documento de política de JSON e substitua o nome do perfil, o nome da política e o documento da política pelos seus próprios valores.

   ```
   aws iam put-role-policy \
   --role-name my_lambda_role \
   --policy-name LambdaS3DestinationPolicy \
   --policy-document file://my_policy.json
   ```

------

### Exemplo de registro de invocação do Amazon SNS e do Amazon SQS
<a name="kinesis-on-failure-destination-example-sns-sqs"></a>

O exemplo a seguir mostra um registro de invocação enviado pelo Lambda para um destino do SQS ou do SNS para um fluxo do DynamoDB.

```
{
    "requestContext": {
        "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:13:49.717Z",
    "DDBStreamBatchInfo": {
        "shardId": "shardId-00000001573689847184-864758bb",
        "startSequenceNumber": "800000000003126276362",
        "endSequenceNumber": "800000000003126276362",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z",
        "batchSize": 1,
        "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388"
    }
}
```

É possível usar essas informações para recuperar os registros afetados da transmissão para solução de problemas. Os registros reais não estão incluídos, portanto, você deve processar esses registros e recuperá-los da transmissão antes que eles expirem e sejam perdidos.

### Exemplo de registro de invocação do Amazon S3
<a name="kinesis-on-failure-destination-example-sns-sqs-s3"></a>

O exemplo a seguir mostra um registro de invocação que o Lambda envia a um bucket do S3 para um fluxo do DynamoDB. Além de todos os campos do exemplo anterior para destinos do SQS e do SNS, o campo `payload` contém o registro de invocação original como uma string JSON com escape.

```
{
    "requestContext": {
        "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:13:49.717Z",
    "DDBStreamBatchInfo": {
        "shardId": "shardId-00000001573689847184-864758bb",
        "startSequenceNumber": "800000000003126276362",
        "endSequenceNumber": "800000000003126276362",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z",
        "batchSize": 1,
        "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388"
    },
    "payload": "<Whole Event>" // Only available in S3
}
```

O objeto do S3 que contém o registro de invocação usa a seguinte convenção de nomenclatura:

```
aws/lambda/<ESM-UUID>/<shardID>/YYYY/MM/DD/YYYY-MM-DDTHH.MM.SS-<Random UUID>
```

# Implementar o processamento com estado do DynamoDB no Lambda
<a name="services-ddb-windows"></a>

As funções do Lambda podem executar aplicações de processamento contínuo de transmissões. Um stream representa dados não vinculados que fluem continuamente por meio de sua aplicação. Para analisar as informações dessa entrada de atualização contínua, você pode vincular os registros incluídos usando uma janela definida em termos de tempo.

As janelas de tumbling são janelas de tempo distintas que abrem e fecham em intervalos regulares. Por padrão, as invocações do Lambda são sem estado. Não é possível usá-las para processar dados ao longo de várias invocações contínuas sem um banco de dados externo. No entanto, com as janelas de tumbling, você pode manter seu estado em todas as invocações. Esse estado contém o resultado agregado das mensagens previamente processadas para a janela atual. Seu estado pode ter no máximo 1 MB por fragmento. Se exceder esse tamanho, o Lambda encerra a janela antes.

Cada registro de um fluxo pertence a uma janela específica. O Lambda processará cada registro pelo menos uma vez, mas não garantirá que cada registro seja processado apenas uma vez. Em casos raros, como tratamento de erros, alguns registros poderão ser processados mais de uma vez. Os registros são sempre processados em ordem na primeira vez. Se os registros forem processados mais de uma vez, poderão ser processados fora de ordem.

## Agregação e processamento
<a name="streams-tumbling-processing"></a>

Sua função gerenciada pelo usuário é chamada tanto para agregação quanto para processamento dos resultados finais dessa agregação. O Lambda agrega todos os registros recebidos na janela. Você pode receber esses registros em vários lotes, cada um como uma invocação separada. Cada invocação recebe um estado. Assim, ao usar janelas de tumbling, sua resposta de função do Lambda deve conter uma propriedade de `state`. Se a resposta não contiver uma propriedade de `state`, o Lambda considerará esta uma invocação com falha. Para satisfazer essa condição, a função pode retornar um objeto do `TimeWindowEventResponse`, que tem a seguinte forma JSON:

**Example `TimeWindowEventResponse`Valores de**  

```
{
    "state": {
        "1": 282,
        "2": 715
    },
    "batchItemFailures": []
}
```

**nota**  
Para funções Java, recomendamos o uso de um `Map<String, String>` para representar o estado.

No final da janela, a sinalização `isFinalInvokeForWindow` é definida como `true` para indicar que esse é o estado final e que está pronto para processamento. Após o processamento, a janela é concluída e sua invocação final é concluída e, em seguida, o estado é descartado.

No final da janela, o Lambda usa o processamento final para ações sobre os resultados da agregação. Seu processamento final é invocado de forma síncrona. Após a invocação bem-sucedida, sua função define os pontos de verificação no número da sequência e o processamento de streams continua. Se a invocação não for bem-sucedida, sua função do Lambda suspenderá o processamento adicional até uma chamada bem-sucedida.

**Example DynamodbTimeWindowEvent**  

```
{
   "Records":[
      {
         "eventID":"1",
         "eventName":"INSERT",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"111",
            "SizeBytes":26,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"2",
         "eventName":"MODIFY",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"222",
            "SizeBytes":59,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"3",
         "eventName":"REMOVE",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"333",
            "SizeBytes":38,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      }
   ],
    "window": {
        "start": "2020-07-30T17:00:00Z",
        "end": "2020-07-30T17:05:00Z"
    },
    "state": {
        "1": "state1"
    },
    "shardId": "shard123456789",
    "eventSourceARN": "stream-ARN",
    "isFinalInvokeForWindow": false,
    "isWindowTerminatedEarly": false
}
```

## Configuração
<a name="streams-tumbling-config"></a>

Você pode configurar janelas em cascata ao criar ou atualizar um mapeamento de fonte de eventos. Para configurar uma janela em cascata, especifique a janela em segundos ([TumblingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-TumblingWindowInSeconds)). O comando de exemplo da AWS Command Line Interface (AWS CLI) a seguir cria um mapeamento de fonte de eventos em streaming com uma janela em cascata de 120 segundos. A função do Lambda definida para agregação e processamento é chamada de `tumbling-window-example-function`.

```
aws lambda create-event-source-mapping \
--event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \
--function-name tumbling-window-example-function \
--starting-position TRIM_HORIZON \
--tumbling-window-in-seconds 120
```

O Lambda determina os limites da janela em cascata com base no horário em que os registros foram inseridos no stream. Todos os registros têm um carimbo de data/hora aproximado disponível que o Lambda usa para determinar os limites.

As agregações de janelas em cascata não são compatíveis com refragmentação. Quando o fragmento termina, o Lambda considera a janela como fechada e os fragmentos filhos iniciam suas próprias janelas em um novo estado.

As janelas em cascata são totalmente compatíveis com as políticas `maxRetryAttempts` e `maxRecordAge`.

**Example Handler.py: agregação e processamento**  
A função do Python a seguir demonstra como agregar e, em seguida, processar seu estado final:  

```
def lambda_handler(event, context):
    print('Incoming event: ', event)
    print('Incoming state: ', event['state'])

#Check if this is the end of the window to either aggregate or process.
    if event['isFinalInvokeForWindow']:
        # logic to handle final state of the window
        print('Destination invoke')
    else:
        print('Aggregate invoke')

#Check for early terminations
    if event['isWindowTerminatedEarly']:
        print('Window terminated early')

    #Aggregation logic
    state = event['state']
    for record in event['Records']:
        state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1

    print('Returning state: ', state)
    return {'state': state}
```

# Parâmetros do Lambda para os mapeamentos das origens de eventos do Amazon DynamoDB
<a name="services-ddb-params"></a>

Todos os tipos de origem de evento Lambda compartilham o mesmo[CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html)e[UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html)Operações de API do. No entanto, apenas alguns dos parâmetros se aplicam aos Fluxos do DynamoDB Streams.


| Parâmetro | Obrigatório | Padrão | Observações | 
| --- | --- | --- | --- | 
|  BatchSize  |  N  |  100  |  Máximo: 10.000.  | 
|  BisectBatchOnFunctionError  |  N  |  false  | nenhuma  | 
|  DestinationConfig  |  N  | N/D  |  Fila padrão do Amazon SQS ou um destino de tópico padrão do Amazon SNS para registros descartados  | 
|  Habilitado  |  N  |  true  | nenhuma  | 
|  EventSourceArn  |  S  | N/D |  O ARN do fluxo de dados ou um consumidor de fluxo  | 
|  FilterCriteria  |  N  | N/D  |  [Controlar quais eventos o Lambda envia para a função](invocation-eventfiltering.md)  | 
|  FunctionName  |  S  | N/D  | nenhuma  | 
|  FunctionResponseTypes  |  N  | N/D |  Para permitir que sua função reporte falhas específicas em um lote, inclua o valor `ReportBatchItemFailures` em `FunctionResponseTypes`. Para obter mais informações, consulte [Configurar resposta em lote parcial com o DynamoDB e o Lambda](services-ddb-batchfailurereporting.md).  | 
|  MaximumBatchingWindowInSeconds  |  N  |  0  | nenhuma  | 
|  MaximumRecordAgeInSeconds  |  N  |  -1  |  -1 significa infinito: registros com falha serão repetidos até que o registro expire. O [limite de retenção de dados para o DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.DataRetention) é de 24 horas. Mínimo: -1 Máximo: 604.800  | 
|  MaximumRetryAttempts  |  N  |  -1  |  -1 significa infinito: registros com falha são repetidos até que o registro expire Mínimo: 0 Máximo: 10.000.  | 
|  ParallelizationFactor  |  N  |  1  |  Máximo: 10  | 
|  StartingPosition  |  S  | N/D  |  TRIM\$1HORIZON  | 
|  TumblingWindowInSeconds  |  N  | N/D  |  Mínimo: 0 Máximo: 900  | 

# Usar a filtragem de eventos com uma origem de eventos do DynamoDB
<a name="with-ddb-filtering"></a>

É possível usar filtragem de eventos para controlar quais registros de um stream ou fila que o Lambda enviará para a função. Para obter informações gerais sobre como a filtragem de eventos funciona, consulte [Controlar quais eventos o Lambda envia para a função](invocation-eventfiltering.md).

Esta seção tem como foco a filtragem de eventos para as origens de eventos do DynamoDB.

**nota**  
Mapeamentos de origens de eventos do DynamoDB somente são compatíveis com filtragem na chave `dynamodb`.

**Topics**
+ [evento do DynamoDB](#filtering-ddb)
+ [Como filtrar com atributos de tabela](#filtering-ddb-attributes)
+ [Como filtrar com expressões boolianas](#filtering-ddb-boolean)
+ [Usar o operador Exists](#filtering-ddb-exists)
+ [Formato JSON para filtragem do DynamoDB](#filtering-ddb-JSON-format)

## evento do DynamoDB
<a name="filtering-ddb"></a>

Suponha que você tenha uma tabela do DynamoDB com chave primária `CustomerName` e atributos `AccountManager` e `PaymentTerms`. Veja a seguir um exemplo de registro do stream da sua tabela do DynamoDB.

```
{
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
          "ApproximateCreationDateTime": "1678831218.0",
          "Keys": {
              "CustomerName": {
                  "S": "AnyCompany Industries"
              }
          },
          "NewImage": {
              "AccountManager": {
                  "S": "Pat Candella"
              },
              "PaymentTerms": {
                  "S": "60 days"
              },
              "CustomerName": {
                  "S": "AnyCompany Industries"
              }
          },
          "SequenceNumber": "111",
          "SizeBytes": 26,
          "StreamViewType": "NEW_IMAGE"
      }
  }
```

Para filtrar com base nos valores de chave e atributo em sua tabela do DynamoDB, use a chave `dynamodb` no registro. As seções a seguir fornecem exemplos de diferentes tipos de filtro.

### Como filtrar com chaves de tabela
<a name="filtering-ddb-keys"></a>

Suponha que você queira que sua função processe somente os registros em que a chave primária `CustomerName` seja “AnyCompany Industries”. O objeto `FilterCriteria` seria como a seguir.

```
{
     "Filters": [
          {
              "Pattern": "{ \"dynamodb\" : { \"Keys\" : { \"CustomerName\" : { \"S\" : [ \"AnyCompany Industries\" ] } } } }"
          }
      ]
 }
```

Para maior clareza, aqui está o valor de `Pattern` do filtro expandido em JSON simples. 

```
{
     "dynamodb": {
          "Keys": {
              "CustomerName": {
                  "S": [ "AnyCompany Industries" ]
                  }
              }
          }
 }
```

É possível adicionar seu filtro usando o console, a AWS CLI ou um modelo do AWS SAM.

------
#### [ Console ]

Para adicionar esse filtro usando o console, siga as instruções em [Anexar critérios de filtro a um mapeamento de fonte de eventos (console)](invocation-eventfiltering.md#filtering-console) e insira a string a seguir em **Critérios do filtro**.

```
{ "dynamodb" : { "Keys" : { "CustomerName" : { "S" : [ "AnyCompany Industries" ] } } } }
```

------
#### [ AWS CLI ]

Para criar um novo mapeamento da origem do evento com esses critérios de filtro usando a AWS Command Line Interface (AWS CLI), execute o comando a seguir.

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"Keys\" : { \"CustomerName\" : { \"S\" : [ \"AnyCompany Industries\" ] } } } }"}]}'
```

Para adicionar esses critérios de filtro a um mapeamento da origem do evento existente, execute o comando a seguir.

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"Keys\" : { \"CustomerName\" : { \"S\" : [ \"AnyCompany Industries\" ] } } } }"}]}'
```

------
#### [ AWS SAM ]

Para adicionar esse filtro usando o AWS SAM, adicione o trecho a seguir ao modelo YAML da origem do evento.

```
FilterCriteria:
   Filters:
     - Pattern: '{ "dynamodb" : { "Keys" : { "CustomerName" : { "S" : [ "AnyCompany Industries" ] } } } }'
```

------

## Como filtrar com atributos de tabela
<a name="filtering-ddb-attributes"></a>

Com o DynamoDB, você também pode usar as teclas `NewImage` e `OldImage` para filtrar os valores dos atributos. Suponha que você queira filtrar registros em que o atributo `AccountManager` na imagem mais recente da tabela seja “Pat Candella” ou “Shirley Rodriguez”. O objeto `FilterCriteria` seria como a seguir.

```
{
    "Filters": [
        {
            "Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\", \"Shirley Rodriguez\" ] } } } }"
        }
    ]
}
```

Para maior clareza, aqui está o valor de `Pattern` do filtro expandido em JSON simples.

```
{
    "dynamodb": {
        "NewImage": {
            "AccountManager": {
                "S": [ "Pat Candella", "Shirley Rodriguez" ]
            }
        }
    }
}
```

É possível adicionar seu filtro usando o console, a AWS CLI ou um modelo do AWS SAM.

------
#### [ Console ]

Para adicionar esse filtro usando o console, siga as instruções em [Anexar critérios de filtro a um mapeamento de fonte de eventos (console)](invocation-eventfiltering.md#filtering-console) e insira a string a seguir em **Critérios do filtro**.

```
{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella", "Shirley Rodriguez" ] } } } }
```

------
#### [ AWS CLI ]

Para criar um novo mapeamento da origem do evento com esses critérios de filtro usando a AWS Command Line Interface (AWS CLI), execute o comando a seguir.

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\", \"Shirley Rodriguez\" ] } } } }"}]}'
```

Para adicionar esses critérios de filtro a um mapeamento da origem do evento existente, execute o comando a seguir.

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\", \"Shirley Rodriguez\" ] } } } }"}]}'
```

------
#### [ AWS SAM ]

Para adicionar esse filtro usando o AWS SAM, adicione o trecho a seguir ao modelo YAML da origem do evento.

```
FilterCriteria:
  Filters:
    - Pattern: '{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella", "Shirley Rodriguez" ] } } } }'
```

------

## Como filtrar com expressões boolianas
<a name="filtering-ddb-boolean"></a>

Você também pode criar filtros usando expressões boolianas AND. Essas expressões podem incluir os parâmetros de chave e de atributo da tabela. Suponha que você queira filtrar registros em que o valor de `NewImage` de `AccountManager` seja “Pat Candella” e o valor de `OldImage` seja “Terry Whitlock”. O objeto `FilterCriteria` seria como a seguir.

```
{
    "Filters": [
        {
            "Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\" ] } } } , \"dynamodb\" : { \"OldImage\" : { \"AccountManager\" : { \"S\" : [ \"Terry Whitlock\" ] } } } }"
        }
    ]
}
```

Para maior clareza, aqui está o valor de `Pattern` do filtro expandido em JSON simples.

```
{ 
    "dynamodb" : { 
        "NewImage" : { 
            "AccountManager" : { 
                "S" : [ 
                    "Pat Candella" 
                ] 
            } 
        } 
    }, 
    "dynamodb": { 
        "OldImage": { 
            "AccountManager": { 
                "S": [ 
                    "Terry Whitlock" 
                ] 
            } 
        } 
    } 
}
```

É possível adicionar seu filtro usando o console, a AWS CLI ou um modelo do AWS SAM.

------
#### [ Console ]

Para adicionar esse filtro usando o console, siga as instruções em [Anexar critérios de filtro a um mapeamento de fonte de eventos (console)](invocation-eventfiltering.md#filtering-console) e insira a string a seguir em **Critérios do filtro**.

```
{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella" ] } } } , "dynamodb" : { "OldImage" : { "AccountManager" : { "S" : [ "Terry Whitlock" ] } } } }
```

------
#### [ AWS CLI ]

Para criar um novo mapeamento da origem do evento com esses critérios de filtro usando a AWS Command Line Interface (AWS CLI), execute o comando a seguir.

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\" ] } } } , \"dynamodb\" : { \"OldImage\" : { \"AccountManager\" : { \"S\" : [ \"Terry Whitlock\" ] } } } } "}]}'
```

Para adicionar esses critérios de filtro a um mapeamento da origem do evento existente, execute o comando a seguir.

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\" ] } } } , \"dynamodb\" : { \"OldImage\" : { \"AccountManager\" : { \"S\" : [ \"Terry Whitlock\" ] } } } } "}]}'
```

------
#### [ AWS SAM ]

Para adicionar esse filtro usando o AWS SAM, adicione o trecho a seguir ao modelo YAML da origem do evento.

```
FilterCriteria:
  Filters:
    - Pattern: '{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella" ] } } } , "dynamodb" : { "OldImage" : { "AccountManager" : { "S" : [ "Terry Whitlock" ] } } } }'
```

------

**nota**  
A filtragem de eventos do DynamoDB não oferece suporte ao uso de operadores numéricos (igualdade numérica e intervalo numérico). Mesmo que os itens em sua tabela sejam armazenados como números, esses parâmetros são convertidos em strings no objeto do registro JSON.

## Usar o operador Exists
<a name="filtering-ddb-exists"></a>

Devido à forma de estruturação dos objetos de evento JSON do DynamoDB, o uso do operador Exists requer cuidados especiais. O operador Exists só funciona em nós terminais no evento JSON. Portanto, se seu padrão de filtro usar Exists para testar um nó intermediário, ele não funcionará. Considere o seguinte item de tabela do DynamoDB:

```
{
  "UserID": {"S": "12345"},
  "Name": {"S": "John Doe"},
  "Organizations": {"L": [
      {"S":"Sales"},
      {"S":"Marketing"},
      {"S":"Support"}
    ]
  }
}
```

Talvez você queira criar um padrão de filtro como o seguinte, que teste os eventos que contenham `"Organizations"`:

```
{ "dynamodb" : { "NewImage" : { "Organizations" : [ { "exists": true } ] } } }
```

No entanto, esse padrão de filtro nunca retornaria uma correspondência porque `"Organizations"` não é um nó terminal. O exemplo a seguir mostra como usar corretamente o operador Exists para estruturar o padrão de filtro desejado:

```
{ "dynamodb" : { "NewImage" : {"Organizations": {"L": {"S": [ {"exists": true } ] } } } } }
```

## Formato JSON para filtragem do DynamoDB
<a name="filtering-ddb-JSON-format"></a>

Para filtrar corretamente eventos de origens do DynamoDB, tanto o campo de dados como os critérios de filtro para o campo de dados (`dynamodb`) devem estar em formato JSON válido. Se algum desses campos não estiver em um formato JSON válido, o Lambda descartará a mensagem ou emitirá uma exceção. A tabela a seguir resume o comportamento específico: 


| Formato dos dados recebidos | Formato de filtro padrão para propriedades de dados | Ação resultante | 
| --- | --- | --- | 
|  JSON válido  |  JSON válido  |  Filtros do Lambda com base em seus critérios de filtro.  | 
|  JSON válido  |  Nenhum padrão de filtro para propriedades de dados  |  Filtros do Lambda (somente nas outras propriedades de metadados) com base nos seus critérios de filtro.  | 
|  JSON válido  |  Não JSON  |  O Lambda emite uma exceção no momento da criação ou atualização do mapeamento da fonte de eventos. O padrão de filtro para propriedades de dados deve estar em um formato JSON válido.  | 
|  Não JSON  |  JSON válido  |  O Lambda descarta o registro.  | 
|  Não JSON  |  Nenhum padrão de filtro para propriedades de dados  |  Filtros do Lambda (somente nas outras propriedades de metadados) com base nos seus critérios de filtro.  | 
|  Não JSON  |  Não JSON  |  O Lambda emite uma exceção no momento da criação ou atualização do mapeamento da fonte de eventos. O padrão de filtro para propriedades de dados deve estar em um formato JSON válido.  | 

# Tutorial: Usar o AWS Lambda com o Amazon DynamoDB Streams
<a name="with-ddb-example"></a>

 Neste tutorial, você cria uma função do Lambda para consumir eventos do Amazon DynamoDB Streams.

## Pré-requisitos
<a name="with-ddb-prepare"></a>

### Instalar o AWS Command Line Interface
<a name="install_aws_cli"></a>

Se você ainda não instalou a AWS Command Line Interface, siga as etapas em [Instalar ou atualizar a versão mais recente da AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) para instalá-la.

O tutorial requer um terminal de linha de comando ou um shell para executar os comandos. No Linux e no macOS, use o gerenciador de pacotes e de shell de sua preferência.

**nota**  
No Windows, alguns comandos da CLI do Bash que você costuma usar com o Lambda (como `zip`) não são compatíveis com os terminais integrados do sistema operacional. Para obter uma versão do Ubuntu com o Bash integrada no Windows, [instale o Subsistema do Windows para Linux](https://docs.microsoft.com/en-us/windows/wsl/install-win10). 

## Criar a função de execução
<a name="with-ddb-create-execution-role"></a>

Crie a [função de execução](lambda-intro-execution-role.md) que dá à sua função permissão para acessar recursos do AWS.

**Para criar uma função de execução**

1. Abra a [página Roles](https://console.aws.amazon.com/iam/home#/roles) (Funções) no console do IAM.

1. Selecione **Create role** (Criar função).

1. Crie uma função com as propriedades a seguir.
   + **Entidade confiável**: Lambda.
   + **Permissions** (Permissões): **AWSLambdaDynamoDBExecutionRole**.
   + **Role name (Nome da função** – **lambda-dynamodb-role**.

O **AWSLambdaDynamoDBExecutionRole** tem as permissões necessárias para a função ler itens do DynamoDB e gravar logs no CloudWatch Logs.

## Criar a função
<a name="with-ddb-example-create-function"></a>

Crie uma função do Lambda que processe seus eventos do DynamoDB. O código da função grava alguns dos dados de eventos de entrada no CloudWatch Logs.

------
#### [ .NET ]

**SDK para .NET**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Como consumir um evento do DynamoDB com o Lambda usando .NET.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace AWSLambda_DDB;

public class Function
{
    public void FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
    {
        context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");

        foreach (var record in dynamoEvent.Records)
        {
            context.Logger.LogInformation($"Event ID: {record.EventID}");
            context.Logger.LogInformation($"Event Name: {record.EventName}");

            context.Logger.LogInformation(JsonSerializer.Serialize(record));
        }

        context.Logger.LogInformation("Stream processing complete.");
    }
}
```

------
#### [ Go ]

**SDK para Go V2**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Como consumir um evento do DynamoDB com o Lambda usando Go.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-lambda-go/events"
	"fmt"
)

func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*string, error) {
	if len(event.Records) == 0 {
		return nil, fmt.Errorf("received empty event")
	}

	for _, record := range event.Records {
	 	LogDynamoDBRecord(record)
	}

	message := fmt.Sprintf("Records processed: %d", len(event.Records))
	return &message, nil
}

func main() {
	lambda.Start(HandleRequest)
}

func LogDynamoDBRecord(record events.DynamoDBEventRecord){
	fmt.Println(record.EventID)
	fmt.Println(record.EventName)
	fmt.Printf("%+v\n", record.Change)
}
```

------
#### [ Java ]

**SDK para Java 2.x**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Consumir um evento do DynamoDB com o Lambda usando Java.  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

public class example implements RequestHandler<DynamodbEvent, Void> {

    private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create();

    @Override
    public Void handleRequest(DynamodbEvent event, Context context) {
        System.out.println(GSON.toJson(event));
        event.getRecords().forEach(this::logDynamoDBRecord);
        return null;
    }

    private void logDynamoDBRecord(DynamodbStreamRecord record) {
        System.out.println(record.getEventID());
        System.out.println(record.getEventName());
        System.out.println("DynamoDB Record: " + GSON.toJson(record.getDynamodb()));
    }
}
```

------
#### [ JavaScript ]

**SDK para JavaScript (v3)**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Consumir um evento do DynamoDB com o Lambda usando JavaScript.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(record => {
        logDynamoDBRecord(record);
    });
};

const logDynamoDBRecord = (record) => {
    console.log(record.eventID);
    console.log(record.eventName);
    console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
```
Consumir um evento do DynamoDB com o Lambda usando TypeScript.  

```
export const handler = async (event, context) => {
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(record => {
        logDynamoDBRecord(record);
    });
}
const logDynamoDBRecord = (record) => {
    console.log(record.eventID);
    console.log(record.eventName);
    console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
```

------
#### [ PHP ]

**SDK para PHP**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Consumir um evento do DynamoDB com o Lambda usando PHP.  

```
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\DynamoDb\DynamoDbHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler extends DynamoDbHandler
{
    private StderrLogger $logger;

    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handleDynamoDb(DynamoDbEvent $event, Context $context): void
    {
        $this->logger->info("Processing DynamoDb table items");
        $records = $event->getRecords();

        foreach ($records as $record) {
            $eventName = $record->getEventName();
            $keys = $record->getKeys();
            $old = $record->getOldImage();
            $new = $record->getNewImage();
            
            $this->logger->info("Event Name:".$eventName."\n");
            $this->logger->info("Keys:". json_encode($keys)."\n");
            $this->logger->info("Old Image:". json_encode($old)."\n");
            $this->logger->info("New Image:". json_encode($new));
            
            // TODO: Do interesting work based on the new data

            // Any exception thrown will be logged and the invocation will be marked as failed
        }

        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords items");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK para Python (Boto3).**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Como consumir um evento do DynamoDB com o Lambda usando Python.  

```
import json

def lambda_handler(event, context):
    print(json.dumps(event, indent=2))

    for record in event['Records']:
        log_dynamodb_record(record)

def log_dynamodb_record(record):
    print(record['eventID'])
    print(record['eventName'])
    print(f"DynamoDB Record: {json.dumps(record['dynamodb'])}")
```

------
#### [ Ruby ]

**SDK para Ruby**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Como consumir um evento do DynamoDB com o Lambda usando Ruby.  

```
def lambda_handler(event:, context:)
    return 'received empty event' if event['Records'].empty?
  
    event['Records'].each do |record|
      log_dynamodb_record(record)
    end
  
    "Records processed: #{event['Records'].length}"
  end
  
  def log_dynamodb_record(record)
    puts record['eventID']
    puts record['eventName']
    puts "DynamoDB Record: #{JSON.generate(record['dynamodb'])}"
  end
```

------
#### [ Rust ]

**SDK para Rust**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Como consumir um evento do DynamoDB com o Lambda usando Rust.  

```
use lambda_runtime::{service_fn, tracing, Error, LambdaEvent};
use aws_lambda_events::{
    event::dynamodb::{Event, EventRecord},
   };


// Built with the following dependencies:
//lambda_runtime = "0.11.1"
//serde_json = "1.0"
//tokio = { version = "1", features = ["macros"] }
//tracing = { version = "0.1", features = ["log"] }
//tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }
//aws_lambda_events = "0.15.0"

async fn function_handler(event: LambdaEvent<Event>) ->Result<(), Error> {
    
    let records = &event.payload.records;
    tracing::info!("event payload: {:?}",records);
    if records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(());
    }

    for record in records{
        log_dynamo_dbrecord(record);
    }

    tracing::info!("Dynamo db records processed");

    // Prepare the response
    Ok(())

}

fn log_dynamo_dbrecord(record: &EventRecord)-> Result<(), Error>{
    tracing::info!("EventId: {}", record.event_id);
    tracing::info!("EventName: {}", record.event_name);
    tracing::info!("DynamoDB Record: {:?}", record.change );
    Ok(())

}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
    .with_max_level(tracing::Level::INFO)
    .with_target(false)
    .without_time()
    .init();

    let func = service_fn(function_handler);
    lambda_runtime::run(func).await?;
    Ok(())
    
}
```

------

**Para criar a função**

1. Copie o código de amostra em um arquivo chamado `example.js`.

1. Crie um pacote de implantação.

   ```
   zip function.zip example.js
   ```

1. Crie uma função do Lambda com o comando `create-function`.

   ```
   aws lambda create-function --function-name ProcessDynamoDBRecords \
       --zip-file fileb://function.zip --handler example.handler --runtime nodejs24.x \
       --role arn:aws:iam::111122223333:role/lambda-dynamodb-role
   ```

## Testar a função do Lambda
<a name="with-dbb-invoke-manually"></a>

Nesta etapa, você invoca sua função do Lambda manualmente usando o comando `invoke` da CLI do AWS Lambda e o seguinte exemplo de evento do DynamoDB. Copie o seguinte para um arquivo chamado `input.txt`.

**Example input.txt**  

```
{
   "Records":[
      {
         "eventID":"1",
         "eventName":"INSERT",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"111",
            "SizeBytes":26,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"2",
         "eventName":"MODIFY",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"222",
            "SizeBytes":59,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"3",
         "eventName":"REMOVE",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"333",
            "SizeBytes":38,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      }
   ]
}
```

Execute o seguinte comando `invoke`. 

```
aws lambda invoke --function-name ProcessDynamoDBRecords \
    --cli-binary-format raw-in-base64-out \
    --payload file://input.txt outputfile.txt
```

A opção **cli-binary-format** será necessária se você estiver usando a AWS CLI versão 2. Para que essa seja a configuração padrão, execute `aws configure set cli-binary-format raw-in-base64-out`. Para obter mais informações, consulte [A AWS CLI comporta opções de linha de comando globais](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-options.html#cli-configure-options-list) no *Guia do usuário da AWS Command Line Interface versão 2*.

A função retorna a string `message` no corpo da resposta. 

Verifique a saída no arquivo `outputfile.txt`.

## Criar uma tabela do DynamoDB com uma transmissão habilitada
<a name="with-ddb-create-buckets"></a>

Criar uma tabela do Amazon DynamoDB com uma transmissão habilitada.

**Como criar uma tabela do DynamoDB**

1. Abra o [console do DynamoDB](https://console.aws.amazon.com/dynamodb).

1. Escolha **Criar tabela**.

1. Crie uma tabela com as configurações a seguir.
   + **O nome da tabela** – **lambda-dynamodb-stream**
   + **Primary key** (Chave primária): **id** (string)

1. Escolha **Criar**.

**Como habilitar fluxos**

1. Abra o [console do DynamoDB](https://console.aws.amazon.com/dynamodb).

1. Escolha **Tables (Tabelas)**.

1. Escolha a tabela **lambda-dynamodb-stream**.

1. Em **Exports and streams** (Exportações e transmissões), escolha **DynamoDB stream details** (Detalhes da tranmissão do DynamoDB).

1. Selecione **Ativar**.

1. Em **Tipo de exibição**, escolha **Somente atributos principais**.

1. Escolha **Ativar fluxo**.

Anote o ARN do fluxo. Você precisará dele na próxima etapa quando for associar a transmissão à função do Lambda. Para obter mais informações sobre como habilitar fluxos, consulte [Capturing table activity with DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) (Capturar atividades de tabelas com o DynamoDB Streams).

## Adicionar uma fonte de eventos no AWS Lambda
<a name="with-ddb-attach-notification-configuration"></a>

Crie um mapeamento da fonte do evento no AWS Lambda. Este mapeamento de fontes de eventos associa a transmissão do DynamoDB à função do Lambda. Depois de criar esse mapeamento da fonte do evento, o AWS Lambda começa a sondar o fluxo.

Execute o comando AWS CLI a seguir da `create-event-source-mapping`. Depois que o comando for executado, anote o UUID. Você precisará deste UUID para se referir ao mapeamento da fonte do evento em todos os comandos, por exemplo, ao excluir o mapeamento da fonte do evento.

```
aws lambda create-event-source-mapping --function-name ProcessDynamoDBRecords \
    --batch-size 100 --starting-position LATEST --event-source DynamoDB-stream-arn
```

 Isso cria um mapeamento entre a transmissão do DynamoDB especificado e a função do Lambda. Você pode associar uma transmissão do DynamoDB a várias funções do Lambda e associar a mesma função do Lambda a várias transmissões. No entanto, as funções do Lambda compartilharão o throughput de leitura para os fluxos dos quais compartilham. 

Você pode obter a lista de mapeamentos de fontes de eventos executando o comando a seguir.

```
aws lambda list-event-source-mappings
```

Esta lista retorna todos os mapeamentos de fontes de eventos que você criou, e para cada mapeamento mostra o `LastProcessingResult`, entre outras coisas. Este campo será usado para fornecer uma mensagem informativa, se houver problemas. Valores como `No records processed` (indica que o AWS Lambda não iniciou a sondagem ou de que não há registros no fluxo) e `OK` (indica que o AWS Lambda foi bem-sucedido na leitura dos registros do fluxo e invocou a função do Lambda) indicam que não há problemas. Se houver problemas, você receberá uma mensagem de erro.

Se você tiver muitos mapeamentos da fonte do evento, use o parâmetro de nome da função para refinar os resultados.

```
aws lambda list-event-source-mappings --function-name ProcessDynamoDBRecords
```

## Testar a configuração
<a name="with-ddb-final-integration-test-no-iam"></a>

Teste a experiência completa. À medida que você executa atualizações nas tabelas, o DynamoDB grava registros de eventos na transmissão. Quando o AWS Lambda sonda o stream, ele detecta novos registros no stream e executa a função do Lambda em seu nome, passando os eventos para a função. 

1. No console do DynamoDB, adicione, atualize ou exclua itens da tabela. O DynamoDB grava registros dessas ações na transmissão.

1. AWS LambdaO sonda a transmissão e, ao detectar atualizações nela, invoca a função do Lambda passando os dados do evento que ele encontrar na transmissão.

1. A função é executada e cria logs no Amazon CloudWatch. Você pode verificar os logs relatados no console do Amazon CloudWatch.

## Próximas etapas
<a name="with-ddb-next-steps"></a>

Este tutorial mostrou conceitos básicos do processamento de eventos de stream do DynamoDB com o Lambda. Para workloads de produção, considere implementar uma lógica de resposta parcial em lote para lidar com falhas de registros individuais de forma mais eficiente. O [utilitário de processador em lote](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/) da Powertools para AWS Lambda está disponível em Python, TypeScript, .NET e Java e fornece uma solução robusta para isso, tratando automaticamente a complexidade das respostas parciais em lote e reduzindo o número de novas tentativas para registros processados com êxito.

## Limpe os recursos
<a name="cleanup"></a>

Agora você pode excluir os recursos criados para este tutorial, a menos que queira mantê-los. Excluindo os recursos da AWS que você não está mais usando, você evita cobranças desnecessárias em sua Conta da AWS.

**Como excluir a função do Lambda**

1. Abra a página [Functions](https://console.aws.amazon.com/lambda/home#/functions) (Funções) no console do Lambda.

1. Selecione a função que você criou.

1. Selecione **Ações**, **Excluir**.

1. Digite **confirm** no campo de entrada de texto e escolha **Delete** (Excluir).

**Para excluir a função de execução**

1. Abra a página [Roles](https://console.aws.amazon.com/iam/home#/roles) (Funções) no console do IAM.

1. Selecione a função de execução que você criou.

1. Escolha **Excluir**.

1. Insira o nome do perfil no campo de entrada de texto e escolha **Delete** (Excluir).

**Para excluir uma tabela do DynamoDB**

1. Abra a [página Tables](https://console.aws.amazon.com//dynamodb/home#tables:) (Tabelas) no console do DynamoDB.

1. Selecione a tabela que você criou.

1. Escolha **Excluir**.

1. Digite **delete** na caixa de texto.

1. Selecione **Delete table** (Excluir tabela).