

# Capturar dados de alterações para o DynamoDB Streams
<a name="Streams"></a>

 O DynamoDB Streams captura uma sequência em ordem temporal de modificações em nível de item em qualquer tabela do DynamoDB e armazena essas informações em um log por até 24 horas. As aplicações podem acessar esse log e visualizar os itens de dados à medida que eles aparecem antes e depois de serem modificados, quase em tempo real.

 A criptografia em repouso criptografa os dados em fluxos do DynamoDB. Para obter mais informações, consulte [Criptografia em repouso do DynamoDB](EncryptionAtRest.md).

Um *fluxo do DynamoDB* é um fluxo ordenado de informações sobre alterações em itens de uma tabela do DynamoDB. Quando você habilita um fluxo em uma tabela, o DynamoDB captura informações sobre todas as modificações em itens de dados na tabela.

Sempre que uma aplicação cria, atualiza ou exclui itens nessa tabela, o DynamoDB Streams grava um registro de fluxo com os atributos de chave primária dos itens que foram modificados. Um *registro de fluxo* contém informações sobre uma modificação de dados em um único item de uma tabela do DynamoDB. É possível configurar o stream de modo que os registros de stream capturem informações adicionais, como as imagens "antes" e "depois" de itens modificados.

O DynamoDB Streams ajuda a garantir:
+ Cada registro do fluxo aparece exatamente uma vez no fluxo.
+ Para cada item modificado em uma tabela do DynamoDB, os registros de fluxo aparecem na mesma sequência que as modificações reais no item.

O DynamoDB Streams grava registros de fluxo em tempo quase real para que você possa criar aplicações que consomem esses fluxos e executam ações com base no conteúdo.

**Topics**
+ [Endpoints para DynamoDB Streams](#Streams.Endpoints)
+ [Habilitar um fluxo](#Streams.Enabling)
+ [Ler e processar um fluxo](#Streams.Processing)
+ [DynamoDB Streams e vida útil](time-to-live-ttl-streams.md)
+ [Usar o adaptador do DynamoDB Streams Kinesis Adapter para processar registros de fluxos](Streams.KCLAdapter.md)
+ [API de baixo nível do DynamoDB Streams: exemplo em Java](Streams.LowLevel.Walkthrough.md)
+ [DynamoDB Streams e acionadores do AWS Lambda](Streams.Lambda.md)
+ [DynamoDB Streams e Apache Flink](StreamsApacheFlink.xml.md)

## Endpoints para DynamoDB Streams
<a name="Streams.Endpoints"></a>

AWSO mantém endpoints separados para fluxos do DynamoDB e DynamoDB Streams. Para trabalhar com índices e tabelas de banco de dados, sua aplicação precisa acessar um endpoint do DynamoDB. Para ler e processar registros do DynamoDB Streams, sua aplicação precisa acessar um endpoint do DynamoDB Streams na mesma região.

O DynamoDB Streams oferece dois conjuntos de endpoints. Eles são:
+ **Endpoints somente IPv4**: endpoints com a convenção de nomenclatura `streams.dynamodb.<region>.amazonaws.com`.
+ **Endpoints de pilha dupla**: novos endpoints compatíveis com IPv4 e IPv6 e que seguem a convenção de nomenclatura `streams-dynamodb.<region>.api.aws`.

**nota**  
Para obter uma lista completa de regiões e endpoints do DynamoDB e do DynamoDB Streams, consulte [Regiões e endpoints](https://docs.aws.amazon.com/general/latest/gr/rande.html) na *Referência geral da AWS*.

Os AWS SDKs fornecem clientes separados para o DynamoDB e o DynamoDB Streams. Dependendo das suas necessidades, sua aplicação pode acessar um endpoint do DynamoDB, um endpoint do DynamoDB Streams ou ambos ao mesmo tempo. Para conectar aos dois endpoints, a aplicação precisa instanciar dois clientes, um para o DynamoDB e outro para o DynamoDB Streams.

## Habilitar um fluxo
<a name="Streams.Enabling"></a>

Você pode habilitar um stream em uma nova tabela ao criá-la usando a AWS CLI ou um dos AWS SDKs. Também pode habilitar ou desabilitar um stream em uma tabela existente ou alterar as configurações de um stream. O DynamoDB Streams opera de forma assíncrona e, portanto, não haverá impacto sobre a performance de uma tabela se você habilitar um stream.

A maneira mais fácil de gerenciar o DynamoDB Streams é usar o Console de gerenciamento da AWS.

1. Faça login no Console de gerenciamento da AWS e abra o console do DynamoDB em [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/).

1. No painel do console do DynamoDB, escolha **Tables** (Tabelas) e selecione uma tabela existente.

1. Escolha a guia **Exports and streams** (Exportações e fluxos).

1. Na seção **Detalhes do stream do DynamoDB**, escolha **Ativar**.

1. Na página **Ativar fluxo do DynamoDB**, escolha as informações que serão gravadas no fluxo sempre que os dados da tabela forem modificados:
   + **Key attributes only (Somente atributos de chaves)**: somente os atributos de chaves do item modificado.
   + **New image** (Nova imagem): o item inteiro como é exibido depois de modificado.
   + **Old image** (Imagem antiga): o item inteiro como era exibido antes de modificado.
   + **New and old images** (Imagens nova e antiga): as imagens nova e antiga do item.

   Quando estiver de acordo com as configurações, escolha **Ativar fluxo**.

1. (Opcional) Para desabilitar um fluxo existente, escolha **Desativar** em **Detalhes do stream do DynamoDB**.

Você também pode usar as operações da API `CreateTable` ou `UpdateTable` para habilitar ou modificar um fluxo. O parâmetro `StreamSpecification` determina como o fluxo é configurado:
+ `StreamEnabled`: especifica se um fluxo está habilitado (`true`) ou desabilitado (`false`) para a tabela.
+ `StreamViewType`: especifica as informações que serão gravadas no fluxo sempre que os dados na tabela forem modificados:
  + `KEYS_ONLY`: somente os atributos de chaves do item modificado.
  + `NEW_IMAGE`: o item inteiro como é exibido depois de ser modificado.
  + `OLD_IMAGE`: o item inteiro como era exibido antes de ser modificado.
  + `NEW_AND_OLD_IMAGES`: as imagens nova e antiga do item.

É possível habilitar ou desabilitar um fluxo a qualquer momento. No entanto, você receberá uma `ValidationException` se tentar habilitar um fluxo em uma tabela que já tenha um fluxo. Você também receberá uma `ValidationException` se tentar desabilitar um fluxo em uma tabela que não tenha um fluxo.

Quando você define `StreamEnabled` como `true`, o DynamoDB cria um novo fluxo com um descritor de streaming exclusivo atribuído a ele. Se você desabilitar e reabilitar um fluxo na tabela, será criado um fluxo com um descritor diferente.

Cada fluxo é identificado exclusivamente por um nome do recurso da Amazon (ARN). Veja a seguir um ARN de exemplo para um fluxo em uma tabela do DynamoDB chamada `TestTable`.

```
arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291
```

Para determinar o descritor de streaming mais recente de uma tabela, emita uma solicitação `DescribeTable` do DynamoDB e procure o elemento `LatestStreamArn` na resposta.

**nota**  
Não é possível editar um `StreamViewType` após a configuração de um fluxo. Se você precisar fazer alterações em um fluxo depois que ele tiver sido configurado, será necessário desativar o fluxo atual e criar outro.

## Ler e processar um fluxo
<a name="Streams.Processing"></a>

Para ler e processar um fluxo, sua aplicação deve se conectar a um endpoint do DynamoDB Streams e emitir solicitações de API.

Um fluxo consiste em *registros de fluxo*. Cada registro de stream representa uma única modificação de dados na tabela do DynamoDB à qual o fluxo pertence. Cada registro de fluxo recebe um número de sequência, refletindo a ordem em que ele foi publicado no fluxo.

Os registros de fluxo estão organizados em grupos ou *fragmentos*. Cada fragmento atua como um contêiner para vários registros de fluxo e contém informações necessárias para acessar esses registros e fazer iterações neles. Os registros de fluxo em um fragmento são removidos automaticamente depois de 24 horas.

Fragmentos são efêmeros: eles são criados e excluídos automaticamente, conforme necessário. Qualquer fragmento também pode ser dividido em vários novos fragmentos, o que também ocorre automaticamente. (Também é possível que um fragmento pai tenha apenas um fragmento filho.) Um fragmento pode ser dividido em resposta a altos níveis de atividades de gravação em sua tabela principal e, portanto, as aplicações podem processar registros de vários fragmentos em paralelo.

Se você desabilitar um fluxo, todos os fragmentos que estiverem abertos serão fechados. Os dados no stream continuarão legíveis por 24 horas.

Como fragmentos têm uma linhagem (pai e filhos), uma aplicação sempre deverá processar um fragmento pai antes de um fragmento filho. Isso ajuda a garantir que os registros de fluxo também sejam processados na ordem correta. (Se você usa o DynamoDB Streams Kinesis Adapter, isso será feito para você. Sua aplicação processará os fragmentos e os registros de fluxo na ordem correta. Ela manipulará automaticamente fragmentos novos ou expirados, bem como fragmentos que são divididos enquanto a aplicação está em execução. Para obter mais informações, consulte [Usar o adaptador do DynamoDB Streams Kinesis Adapter para processar registros de fluxos](Streams.KCLAdapter.md).)

O diagrama a seguir mostra a relação entre um fluxo, os fragmentos nesse fluxo e os registros de fluxo nesses fragmentos.

![\[Estrutura do DynamoDB Streams. Os registros de fluxo que representam modificações de dados são organizados em fragmentos.\]](http://docs.aws.amazon.com/pt_br/amazondynamodb/latest/developerguide/images/streams-terminology.png)


**nota**  
Se você executar uma operação `PutItem` ou `UpdateItem` que não altere nenhum dado em um item, o DynamoDB Streams *não* gravará um registro de fluxo para essa operação.

Para acessar um fluxo e processar os registros de fluxo dentro dele, você deve fazer o seguinte:
+ Determinar o ARN exclusivo do fluxo que deseja acessar.
+ Determinar quais fragmentos no fluxo contêm os registros de fluxo desejados.
+ Acessar os fragmentos e recuperar os registros de fluxo desejados.

**nota**  
No máximo dois processos devem estar lendo simultaneamente do mesmo fragmento de fluxo. Ter mais de dois leitores por fragmento pode resultar em controle de utilização.

A API do DynamoDB Streams fornece as seguintes ações para uso por programas de aplicações:
+  `[ListStreams](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_ListStreams.html)`: retorna uma lista de descritores de fluxos para a conta e o endpoint atuais. Opcionalmente, você pode solicitar apenas os descritores de fluxo de um nome de tabela específico.
+ `[DescribeStream](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_DescribeStream.html)`: retorna informações sobre um streaming, incluindo o status atual do streaming, o Nome de recurso da Amazon (ARN), a composição de seus estilhaços e sua tabela correspondente do DynamoDB Você pode opcionalmente usar o campo `ShardFilter` para recuperar o fragmento filho existente associado ao fragmento pai.
+ `[GetShardIterator](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html)`: retorna um *iterador de fragmentos* que descreve um local dentro de um fragmento. Você pode solicitar que o iterador forneça acesso ao ponto mais antigo, o ponto mais novo ou um ponto específico no fluxo.
+ `[GetRecords](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html)`: retorna os registros do fluxo de um determinado fragmento. Você deve fornecer o iterador de fragmentos retornado de uma solicitação `GetShardIterator`.

Para obter descrições completas dessas operações da API, incluindo solicitações e respostas de exemplo, consulte a [Referência da API do Amazon DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Operations_Amazon_DynamoDB_Streams.html).

### Descoberta de fragmentos
<a name="Streams.ShardDiscovery"></a>



Descubra novos fragmentos em seu stream do DynamoDB com dois métodos poderosos. Como usuário do Amazon DynamoDB Streams, você tem duas maneiras eficazes de rastrear e identificar novos fragmentos:

**Pesquisando toda a topologia do stream**  
Use a API `DescribeStream` para pesquisar regularmente o stream. Isso retorna todos os fragmentos no stream, incluindo novos fragmentos que tenham sido criados. Ao comparar os resultados ao longo do tempo, você pode detectar fragmentos recém-adicionados.

**Descobrindo fragmentos filhos**  
Use a API `DescribeStream` com o parâmetro `ShardFilter` para encontrar um subconjunto de fragmentos. Ao especificar um fragmento pai na solicitação, o DynamoDB Streams retornará seus fragmentos filhos imediatos. Essa abordagem é útil quando você só precisa rastrear a linhagem do fragmento sem escanear todo o stream.   
Os aplicativos que consomem dados do DynamoDB Streams podem fazer a transição eficiente da leitura de um fragmento fechado para seu fragmento filho usando esse parâmetro `ShardFilter`, evitando chamadas repetidas à API `DescribeStream` para recuperar e percorrer o mapa de fragmentos de todos os fragmentos fechados e abertos. Isso ajuda a descobrir rapidamente os fragmentos filhos após o fechamento de um fragmento pai, tornando seus aplicativos de processamento de stream mais responsivos e econômicos.

Ambos os métodos permitem que você fique por dentro da estrutura em evolução do seu DynamoDB Streams, garantindo que você nunca perca atualizações críticas de dados ou modificações de fragmentos.

### Limite de retenção de dados para o DynamoDB Streams
<a name="Streams.DataRetention"></a>

Todos os dados no DynamoDB Streams estão sujeitos a um tempo de vida de 24 horas. É possível recuperar e analisar as atividades das últimas 24 horas de qualquer tabela. No entanto, os dados mais antigos que 24 horas estão suscetíveis a remoção a qualquer momento.

Se você desabilitar um fluxo em uma tabela, os dados nesse fluxo continuarão legíveis por 24 horas. Depois desse tempo, os dados expirarão, e os registros de fluxo serão excluídos automaticamente. Não há nenhum mecanismo para excluir manualmente um fluxo existente. Aguarde até que o limite de retenção expire (24 horas), e todos os registros do fluxo sejam excluídos.

# DynamoDB Streams e vida útil
<a name="time-to-live-ttl-streams"></a>

Você pode fazer backup ou processar os itens excluídos por [vida útil](TTL.md) (TTL) habilitando o Amazon DynamoDB Streams na tabela e processando os registros de fluxos dos itens expirados. Para obter mais informações, consulte [Ler e processar um fluxo](Streams.md#Streams.Processing).

O registro de fluxos contém um campo de identidade do usuário `Records[<index>].userIdentity`.

Os itens excluídos pelo processo de vida útil após a expiração têm os seguintes campos:
+ `Records[<index>].userIdentity.type`

  `"Service"`
+ `Records[<index>].userIdentity.principalId`

  `"dynamodb.amazonaws.com"`

**nota**  
Quando você usa o TTL em uma tabela global, a região em que o TTL foi executado terá o campo `userIdentity` definido. Esse campo não será definido em outras regiões quando a exclusão for replicada.

O JSON a seguir mostra a parte relevante de um único registro de fluxos.

```
"Records": [
    {
        ...

        "userIdentity": {
            "type": "Service",
            "principalId": "dynamodb.amazonaws.com"
        }

        ...

    }
]
```

## Usar o DynamoDB Streams e o Lambda para arquivar itens excluídos do TTL
<a name="streams-archive-ttl-deleted-items"></a>

Combinar a [vida útil (TTL) do DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/TTL.html), o [DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) e o [AWS Lambda](https://aws.amazon.com/lambda/) pode ajudar a simplificar os dados de arquivo, reduzir os custos de armazenamento do DynamoDB e diminuir a complexidade do código. O uso do Lambda como consumidor de fluxo oferece muitas vantagens, principalmente a redução de custos em comparação com outros consumidores, como a Kinesis Client Library (KCL). Você não é cobrado por chamadas de API `GetRecords` no fluxo do DynamoDB ao usar o Lambda para consumir eventos, e o Lambda pode fornecer filtragem de eventos por meio da identificação de padrões JSON em um evento de fluxo. Com a filtragem de conteúdo de padrão de evento, é possível definir até cinco filtros diferentes para controlar quais eventos são enviados ao Lambda para processamento. Isso ajuda a reduzir as invocações de suas funções do Lambda, simplifica o código e diminui o custo geral.

Embora o DynamoDB Streams contenha todas as modificações de dados, como as ações `Create`, `Modify` e `Remove`, isso pode resultar em invocações indesejadas da função do Lambda de arquivo. Por exemplo, digamos que você tenha uma tabela com 2 milhões de modificações de dados por hora fluindo para o fluxo, mas menos de 5% delas são exclusões de itens que expirarão no processo de TTL e precisam ser arquivadas. Com [filtros de origem de eventos do Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html), a função do Lambda invocará apenas 100 mil vezes por hora. O resultado da filtragem de eventos é que você é cobrado apenas pelas invocações necessárias, e não pelos 2 milhões de invocações que você teria sem a filtragem de eventos.

A filtragem de eventos é aplicada ao [mapeamento da origem de eventos do Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html), que é um recurso que lê com base em um evento escolhido (o fluxo do DynamoDB) e invoca uma função do Lambda. No diagrama a seguir, você pode ver como um item excluído por vida útil é consumido por uma função do Lambda usando fluxos e filtros de eventos.

![\[Um item excluído por meio do processo de TTL inicia uma função do Lambda que usa fluxos e filtros de eventos.\]](http://docs.aws.amazon.com/pt_br/amazondynamodb/latest/developerguide/images/streams-lambda-ttl.png)


### Padrão de filtro de eventos da vida útil do DynamoDB
<a name="ttl-event-filter-pattern"></a>

A adição do JSON a seguir aos [critérios de filtro](https://docs.aws.amazon.com/lambda/latest/dg/API_FilterCriteria.html) do mapeamento de origem de eventos permite que você invoque sua função do Lambda somente para itens excluídos por TTL:

```
{
    "Filters": [
        {
            "Pattern": { "userIdentity": { "type": ["Service"], "principalId": ["dynamodb.amazonaws.com"] } }
        }
    ]
}
```

### Crie um mapeamento da origem de eventos no AWS Lambda.
<a name="create-event-source-mapping"></a>

Use os trechos de código a seguir para criar um mapeamento de origem de eventos filtrado que você possa conectar ao fluxo do DynamoDB de uma tabela. Cada bloco de código inclui o padrão de filtro de eventos.

------
#### [ AWS CLI ]

```
aws lambda create-event-source-mapping \
--event-source-arn 'arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000' \
--batch-size 10 \
--enabled \
--function-name test_func \
--starting-position LATEST \
--filter-criteria '{"Filters": [{"Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"}]}'
```

------
#### [ Java ]

```
LambdaClient client = LambdaClient.builder()
        .region(Region.EU_WEST_1)
        .build();

Filter userIdentity = Filter.builder()
        .pattern("{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}")
        .build();

FilterCriteria filterCriteria = FilterCriteria.builder()
        .filters(userIdentity)
        .build();

CreateEventSourceMappingRequest mappingRequest = CreateEventSourceMappingRequest.builder()
        .eventSourceArn("arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000")
        .batchSize(10)
        .enabled(Boolean.TRUE)
        .functionName("test_func")
        .startingPosition("LATEST")
        .filterCriteria(filterCriteria)
        .build();

try{
    CreateEventSourceMappingResponse eventSourceMappingResponse = client.createEventSourceMapping(mappingRequest);
    System.out.println("The mapping ARN is "+eventSourceMappingResponse.eventSourceArn());

}catch (ServiceException e){
    System.out.println(e.getMessage());
}
```

------
#### [ Node ]

```
const client = new LambdaClient({ region: "eu-west-1" });

const input = {
    EventSourceArn: "arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000",
    BatchSize: 10,
    Enabled: true,
    FunctionName: "test_func",
    StartingPosition: "LATEST",
    FilterCriteria: { "Filters": [{ "Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}" }] }
}

const command = new CreateEventSourceMappingCommand(input);

try {
    const results = await client.send(command);
    console.log(results);
} catch (err) {
    console.error(err);
}
```

------
#### [ Python ]

```
session = boto3.session.Session(region_name = 'eu-west-1')
client = session.client('lambda')

try:
    response = client.create_event_source_mapping(
        EventSourceArn='arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000',
        BatchSize=10,
        Enabled=True,
        FunctionName='test_func',
        StartingPosition='LATEST',
        FilterCriteria={
            'Filters': [
                {
                    'Pattern': "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"
                },
            ]
        }
    )
    print(response)
except Exception as e:
    print(e)
```

------
#### [ JSON ]

```
{
  "userIdentity": {
     "type": ["Service"],
     "principalId": ["dynamodb.amazonaws.com"]
   }
}
```

------

# Usar o adaptador do DynamoDB Streams Kinesis Adapter para processar registros de fluxos
<a name="Streams.KCLAdapter"></a>

Usar o Amazon Kinesis Adapter é a forma recomendada para consumir fluxos do Amazon DynamoDB. A API do DynamoDB Streams é intencionalmente semelhante à do Kinesis Data Streams. Em ambos os serviços, os fluxos de dados são compostos de fragmentos, os quais são contêineres de registros de stream. Ambas as APIs de serviços contêm as operações `ListStreams`, `DescribeStream`, `GetShards` e `GetShardIterator`. (Embora essas ações do DynamoDB Streams sejam semelhantes às suas equivalentes no Kinesis Data Streams, elas não são 100% idênticas.)

Como um usuário do DynamoDB Streams, você pode utilizar os padrões de design encontrados no KCL para processar fragmentos e registros de fluxos do DynamoDB Streams. Para isso, você pode usar o DynamoDB Streams Kinesis Adapter. O Kinesis Adapter implementa a interface do Kinesis Data Streams para que a KCL possa ser usada para consumir e processar registros do DynamoDB Streams. Para obter instruções sobre como configurar e instalar o DynamoDB Streams Kinesis Adapter, consulte o [repositório do GitHub](https://github.com/awslabs/dynamodb-streams-kinesis-adapter).

Você pode escrever aplicações para Kinesis Data Streams usando a Kinesis Client Library (KCL). A KCL simplifica a codificação fornecendo abstrações úteis acima da API de baixo nível do Kinesis Data Streams. Para obter mais informações sobre a KCL, consulte [Desenvolver consumidores usando a biblioteca de clientes do Kinesis](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) no *Guia do desenvolvedor do Amazon Kinesis Data Streams Developer Guide*.

O DynamoDB recomenda usar a KCL versão 3.x com o AWS SDK para Java v2.x. Durante o período de transição, a versão atual do DynamoDB Streams Kinesis Adapter versão 1.x com o AWS SDK para AWS SDK para Java v1.x continuará recebendo suporte total ao longo do ciclo de vida, de acordo com o estabelecido em alinhamento com a [política de manutenção de AWS SDKs e ferramentas](https://docs.aws.amazon.com/sdkref/latest/guide/maint-policy.html).

**nota**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. A KCL 1.x chegará ao fim do suporte em 30 de janeiro de 2026. É altamente recomendável que você migre suas aplicações de KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a [página da Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) no GitHub. Para ter mais informações sobre as versões mais recentes da KCL, consulte [Use Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/kcl.html).. Para obter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte Migração da KCL 1.x para a KCL 3.x .

O diagrama a seguir mostra como essas bibliotecas interagem entre si.

![\[Interação entre o DynamoDB Streams, o Kinesis Data Streams e o KCL para processar registros do DynamoDB Streams.\]](http://docs.aws.amazon.com/pt_br/amazondynamodb/latest/developerguide/images/streams-kinesis-adapter.png)


Com o DynamoDB Streams Kinesis Adapter implementado, você pode começar a desenvolver a interface da KCL com as chamadas de API perfeitamente direcionadas no endpoint do DynamoDB Streams.

Quando a aplicação é iniciada, ela chama a KCL para instanciar um operador. Você deve fornecer ao operador informações de configuração da aplicação, como o descritor do fluxo e as credenciais da AWS, além do nome de uma classe de processador de registro que você fornecer. Como ele executa o código no processador de registro, o operador executa as seguintes tarefas:
+ Conecta-se ao stream
+ Enumera os fragmentos no fluxo
+ Verifica e enumera fragmentos filhos de um fragmento pai fechado dentro do fluxo
+ Coordena as associações do estilhaço com outros operadores (se houver)
+ Cria uma instância de um processador de registro para cada fragmento que gerencia
+ Extrai registros do fluxo
+ Escala a taxa de chamadas de API GetRecords quanto há alto throughput (se o modo de recuperação estiver configurado).
+ Envia os registros ao processador de registros correspondente
+ Registros processados pelos pontos de verificação
+ Equilibra as associações de estilhaço-operador quando a contagem de instância de operadores muda
+ Equilibra as associações de fragmento-operador quando os fragmentos se dividem

O adaptador da KCL permite o modo de recuperação, um recurso de ajuste automático da taxa de chamadas para lidar com aumentos temporários de throughput. Quando o atraso no processamento de fluxos excede um limite configurável (padrão: um minuto), o modo de recuperação escala a frequência de chamadas de API GetRecords de acordo com um valor configurável (padrão: três vezes) para recuperar os registros mais depressa e depois volta ao normal quando o atraso diminui. Isso é fundamental durante períodos de alto throughput em que a atividade de gravação do DynamoDB pode sobrecarregar os consumidores usando taxas de sondagem padrão. O modo de recuperação pode ser habilitado por meio do parâmetro de configuração `catchupEnabled` (padrão: false).

**nota**  
Para obter uma descrição dos conceitos da KCL aqui listados, consulte [Desenvolver consumidores usando a biblioteca clientes do Kinesis](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) no *Guia do desenvolvedor do Amazon Kinesis Data Streams*.  
Para obter mais informações sobre como usar fluxos com o AWS Lambda, consulte [DynamoDB Streams e acionadores do AWS Lambda](Streams.Lambda.md)

# Migrar da KCL 1.x para a KCL 3.x
<a name="streams-migrating-kcl"></a>

## Visão geral
<a name="migrating-kcl-overview"></a>

Este guia oferece instruções para migrar uma aplicação de consumidor da KCL 1.x para a KCL 3.x. Devido a diferenças de arquitetura entre a KCL 1.x e a KCL 3.x, a migração requer a atualização de vários componentes para garantir compatibilidade.

A KCL 1.x usa classes e interfaces diferentes em comparação com o KCL 3.x. Você deve primeiro migrar o processador de registros, a fábrica do processador de registros e as classes de operador para o formato compatível com a KCL 3.x e seguir as etapas de migração da KCL 1.x para a KCL 3.x.

## Etapas da migração
<a name="migration-steps"></a>

**Topics**
+ [Etapa 1: migrar o processador de registros](#step1-record-processor)
+ [Etapa 2: migrar a fábrica do processador de registros](#step2-record-processor-factory)
+ [Etapa 3: migrar o operador](#step3-worker-migration)
+ [Etapa 4: visão geral e recomendações de configuração da KCL 3.x](#step4-configuration-migration)
+ [Etapa 5: migrar da KCL 2.x para a KCL 3.x](#step5-kcl2-to-kcl3)

### Etapa 1: migrar o processador de registros
<a name="step1-record-processor"></a>

Este exemplo mostra um processador de registros implementado para o DynamoDB Streams Kinesis Adapter da KCL 1.x:

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**Como migrar a classe RecordProcessor**

1. Altere as interfaces de `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` e `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` para `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor` da seguinte forma:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. Atualize as instruções de importação para os métodos `initialize` e `processRecords`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. Substitua o método `shutdownRequested` pelos novos métodos a seguir: `leaseLost`, `shardEnded`, e `shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

Esta é a versão atualizada da classe de processador de registros:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**nota**  
O DynamoDB Streams Kinesis Adapter agora usa o modelo de registro SDKv2. No SDKv2, objetos `AttributeValue` complexos (`BS`, `NS`, `M`, `L` e `SS`) nunca retornam null. Use os métodos `hasBs()`, `hasNs()`, `hasM()`, `hasL()` e `hasSs()` para verificar se esses valores existem.

### Etapa 2: migrar a fábrica do processador de registros
<a name="step2-record-processor-factory"></a>

A fábrica do processador de registros é responsável por criar processadores de registro quando uma concessão é realizada. Veja o seguinte exemplo de fábrica da KCL 1.x:

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**Migrar para `RecordProcessorFactory`**
+ Altere a interface implementada de `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` para `software.amazon.kinesis.processor.ShardRecordProcessorFactory` da seguinte forma:

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

Veja o seguinte exemplo de fábrica de processador de registros em 3.0:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### Etapa 3: migrar o operador
<a name="step3-worker-migration"></a>

Na versão 3.0 da KCL, uma nova classe, chamada **Scheduler**, substitui a classe **Worker**. Veja o seguinte exemplo de operador da KCL 1.x:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**Para migrar o operador**

1. Altere a instrução `import` para a classe `Worker` para as instruções de importação para as classes `Scheduler` e `ConfigsBuilder`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Importe `StreamTracker` e altere a importação de `StreamsWorkerFactory` para `StreamsSchedulerFactory`.

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. Escolha a posição por meio da qual iniciar a aplicação. Ela pode ser `TRIM_HORIZON` ou `LATEST`.

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. Crie uma instância de `StreamTracker`.

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. Crie o objeto `AmazonDynamoDBStreamsAdapterClient`.

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. Crie o objeto `ConfigsBuilder`.

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. Crie o `Scheduler` usando `ConfigsBuilder` como mostrado no seguinte exemplo:

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**Importante**  
A configuração `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` mantém a compatibilidade entre o DynamoDB Streams Kinesis Adapter para a KCL v3 e a KCL v1, não entre a KCL v2 e a v3.

### Etapa 4: visão geral e recomendações de configuração da KCL 3.x
<a name="step4-configuration-migration"></a>

Para obter uma descrição detalhada das configurações introduzidas após a KCL 1.x que são relevantes na KCL 3.x, consulte [KCL configurations](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html) e [KCL migration client configuration](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration).

**Importante**  
Em vez de criar objetos diretamente de `checkpointConfig`, `coordinatorConfig`, `leaseManagementConfig`, `metricsConfig`, `processorConfig` e `retrievalConfig`, recomendamos usar `ConfigsBuilder` para definir configurações na KCL 3.x e versões posteriores para evitar problemas de inicialização do Agendador. O `ConfigsBuilder` oferece uma maneira mais flexível e sustentável de configurar sua aplicação da KCL.

#### Configurações com valor padrão de atualização na KCL 3.x
<a name="kcl3-configuration-overview"></a>

`billingMode`  
Na KCL versão 1.x, o valor padrão para `billingMode` é definido como `PROVISIONED`. No entanto, na KCL versão 3.x, o padrão `billingMode` é `PAY_PER_REQUEST` (modo sob demanda). Recomendamos que você use o modo de capacidade sob demanda em sua tabela de concessões para ajustar automaticamente a capacidade com base no uso. Para obter orientações sobre como usar a capacidade provisionada para suas tabelas de concessões, consulte [Best practices for the lease table with provisioned capacity mode](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html).

`idleTimeBetweenReadsInMillis`  
Na KCL versão 1.x, o valor padrão para `idleTimeBetweenReadsInMillis` é definido como 1.000 (ou 1 segundo). A KCL versão 3.x define o valor padrão para i`dleTimeBetweenReadsInMillis` como 1.500 (ou 1,5 segundo), mas o Amazon DynamoDB Streams Kinesis Adapter substitui esse valor padrão, definindo-o como 1.000 (ou 1 segundo).

#### Novas configurações na KCL 3.x
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
Essa configuração define o intervalo de tempo antes que os fragmentos recém-descobertos comecem a ser processados, e é calculada como 1,5 × `leaseAssignmentIntervalMillis`. Se essa configuração não for definida explicitamente, o intervalo de tempo será padronizado como 1,5 × `failoverTimeMillis`. O processamento de novos fragmentos exige a verificação da tabela de concessões e a consulta a um índice secundário global (GSI) na tabela de concessões. A redução de `leaseAssignmentIntervalMillis` aumenta a frequência dessas operações de verificação e consulta, aumentando os custos do DynamoDB. Recomendamos definir esse valor como 2 mil (ou 2 segundos) para minimizar o atraso no processamento de novos fragmentos.

`shardConsumerDispatchPollIntervalMillis`  
Essa configuração define o intervalo entre pesquisas sucessivas feitas pelo consumidor do fragmento para acionar transições de estado. Na KCL versão 1.x, esse comportamento era controlado pelo parâmetro `idleTimeInMillis`, que não era exposto como uma definição configurável. Na KCL versão 3.x, recomendamos definir essa configuração para corresponder ao valor usado em ` idleTimeInMillis` na configuração da KCL versão 1.x.

### Etapa 5: migrar da KCL 2.x para a KCL 3.x
<a name="step5-kcl2-to-kcl3"></a>

Para garantir uma transição tranquila e a compatibilidade com a versão mais recente da Kinesis Client Library (KCL), siga as etapas de 5 a 8 nas instruções do guia de migração para [atualizar da KCL 2.x para a KCL 3.x](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics).

Para solucionar problemas comuns da KCL 3.x, consulte [Troubleshooting KCL consumer applications](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html).

# Reverter para a versão anterior da KCL
<a name="kcl-migration-rollback"></a>

Este tópico explica como reverter sua aplicação de consumidor para a versão anterior da KCL. O processo de reversão consiste em duas etapas:

1. Execute a [Ferramenta de Migração da KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

1. Reimplante o código da versão anterior da KCL.

## Etapa 1: executar a Ferramenta de Migração da KCL
<a name="kcl-migration-rollback-step1"></a>

Quando precisar reverter para a versão anterior da KCL, você deve executar a Ferramenta de Migração da KCL. Essa ferramenta executa duas tarefas importantes:
+ Ela remove uma tabela de metadados chamada tabela de métricas do operador e o índice secundário global na tabela de concessões no DynamoDB. Esses artefatos são criados pela KCL 3.x, mas não são necessários quando você reverte para a versão anterior.
+ Ela faz com que todos os operadores funcionem em um modo compatível com a KCL 1.x e comecem a usar o algoritmo de balanceamento de carga usado nas versões anteriores da KCL. Se você tiver problemas com o novo algoritmo de balanceamento de carga na KCL 3.x, isso mitigará o problema imediatamente.

**Importante**  
A tabela de estados do coordenador no DynamoDB deve existir e não deve ser excluída durante o processo de migração, reversão e avanço.

**nota**  
É importante que todos os operadores em sua aplicação de consumidor usem o mesmo algoritmo de balanceamento de carga em um determinado momento. A Ferramenta de Migração da KCL garante que todos os operadores em sua aplicação de consumidor da KCL 3.x mudem para o modo compatível com a KCL 1.x para que todos os operadores executem o mesmo algoritmo de balanceamento de carga durante a reversão da aplicação para a versão anterior da KCL.

Você pode baixar a [Ferramenta de Migração da KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) no diretório de scripts do [repositório da KCL no GitHub](https://github.com/awslabs/amazon-kinesis-client/tree/master). Execute o script usando um operador ou host com as permissões apropriadas para gravar na tabela de estados do coordenador, na tabela de métricas do operador e na tabela de concessões. As [permissões do IAM](https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html) apropriadas devem estar configuradas para aplicações de consumidor da KCL. Execute o script uma única vez para cada aplicação da KCL usando o comando especificado:

```
python3 ./KclMigrationTool.py --region region --mode rollback [--application_name applicationName] [--lease_table_name leaseTableName] [--coordinator_state_table_name coordinatorStateTableName] [--worker_metrics_table_name workerMetricsTableName]
```

### Parâmetros
<a name="kcl-migration-rollback-parameters"></a>

`--region`  
Substitua *region* pela Região da AWS.

`--application_name`  
Esse parâmetro é obrigatório se você estiver usando nomes padrão para suas tabelas de metadados do DynamoDB (tabela de concessões, tabela de estados do coordenador e tabela de métricas do operador). Se você tiver especificado nomes personalizados para essas tabelas, poderá omitir esse parâmetro. Substitua *applicationName* pelo nome da aplicação da KCL. A ferramenta usa esse nome para obter os nomes de tabela padrão se os nomes personalizados não forem fornecidos.

`--lease_table_name`  
Esse parâmetro é necessário quando você define um nome personalizado para a tabela de concessões na configuração da KCL. Se você estiver usando o nome padrão da tabela, poderá omitir esse parâmetro. Substitua *leaseTableName* pelo nome da tabela personalizada que você especificou para a tabela de concessões.

`--coordinator_state_table_name`  
Esse parâmetro é necessário quando você define um nome personalizado para a tabela de estados do coordenador na configuração da KCL. Se você estiver usando o nome padrão da tabela, poderá omitir esse parâmetro. Substitua *coordinatorStateTableName* pelo nome da tabela personalizada que você especificou para a tabela de estados do coordenador.

`--worker_metrics_table_name`  
Esse parâmetro é necessário quando você define um nome personalizado para a tabela de métricas do operador na configuração da KCL. Se você estiver usando o nome padrão da tabela, poderá omitir esse parâmetro. Substitua *workerMetricsTableName* pelo nome da tabela personalizada que você especificou para a tabela de métricas do operador.

## Etapa 2: reimplantar o código com a versão anterior da KCL
<a name="kcl-migration-rollback-step2"></a>

**Importante**  
Qualquer menção à versão 2.x na saída gerada pela Ferramenta de Migração da KCL deve ser interpretada como sendo a versão 1.x da KCL. A execução do script não realiza uma reversão completa, apenas alterna o algoritmo de balanceamento de carga para o usado na versão 1.x da KCL.

Depois de executar a Ferramenta de Migração da KCL para uma reversão, você verá uma destas mensagens:

Mensagem 1  
“Rollback completed. Your application was running 2x compatible functionality. Please rollback to your previous application binaries by deploying the code with your previous KCL version”.  
**Ação necessária:** isso significa que os operadores estavam executando no modo compatível com a KCL 1.x. Reimplante o código para os operadores com a versão anterior da KCL.

Mensagem 2  
“Rollback completed. Your KCL Application was running 3x functionality and will rollback to 2x compatible functionality. If you don't see mitigation after a short period of time, please rollback to your previous application binaries by deploying the code with your previous KCL version”.  
**Ação necessária:** isso significa que os operadores estavam executando no modo da KCL 3.x e a Ferramenta de Migração da KCL mudou todos os operadores para o modo compatível com a KCL 1.x. Reimplante o código para os operadores com a versão anterior da KCL.

Mensagem 3  
“Application was already rolled back. Any KCLv3 resources that could be deleted were cleaned up to avoid charges until the application can be rolled forward with migration”.  
**Ação necessária:** isso significa que os operadores já foram revertidos para execução no modo compatível com a KCL 1.x. Reimplante o código para os operadores com a versão anterior da KCL.

# Avançar para a KCL 3.x após uma reversão
<a name="kcl-migration-rollforward"></a>

Este tópico explica como avançar sua aplicação de consumidor para a KCL 3.x após uma reversão. Quando precisar avançar, você deve concluir um processo de duas etapas:

1. Execute a [Ferramenta de Migração da KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

1. Implantar o código com a KCL 3.x.

## Etapa 1: executar a Ferramenta de Migração da KCL
<a name="kcl-migration-rollforward-step1"></a>

Execute a Ferramenta de Migração da KCL com o seguinte comando para avançar para a KCL 3.x:

```
python3 ./KclMigrationTool.py --region region --mode rollforward [--application_name applicationName] [--coordinator_state_table_name coordinatorStateTableName]
```

### Parâmetros
<a name="kcl-migration-rollforward-parameters"></a>

`--region`  
Substitua *region* pela Região da AWS.

`--application_name`  
Esse parâmetro será obrigatório se você estiver usando nomes padrão para a tabela de estados do coordenador. Se você tiver especificado nomes personalizados para a tabela de estados do coordenador, poderá omitir esse parâmetro. Substitua *applicationName* pelo nome da aplicação da KCL. A ferramenta usa esse nome para obter os nomes de tabela padrão se os nomes personalizados não forem fornecidos.

`--coordinator_state_table_name`  
Esse parâmetro é necessário quando você define um nome personalizado para a tabela de estados do coordenador na configuração da KCL. Se você estiver usando o nome padrão da tabela, poderá omitir esse parâmetro. Substitua *coordinatorStateTableName* pelo nome da tabela personalizada que você especificou para a tabela de estados do coordenador.

Após a execução da Ferramenta de Migração no modo de avanço, o KCL cria os seguintes recursos do DynamoDB necessários para a KCL 3.x:
+ Um índice secundário global na tabela de concessões
+ Uma tabela de métricas do operador

## Etapa 2: implantar o código com a KCL 3.x
<a name="kcl-migration-rollforward-step2"></a>

Depois de executar a Ferramenta de Migração da KCL para um avanço, implante seu código com a KCL 3.x nos operadores. Para concluir sua migração, consulte [Step 8: Complete the migration](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-finish).

# Demonstração: DynamoDB Streams Kinesis Adapter
<a name="Streams.KCLAdapter.Walkthrough"></a>

Esta seção é uma demonstração de uma aplicação em Java que usa a Amazon Kinesis Client Library e o Amazon DynamoDB Streams Kinesis Adapter. A aplicação mostra um exemplo de replicação de dados, no qual as atividades de gravação de uma tabela são aplicadas a uma segunda tabela, com o conteúdo de ambas mantido em sincronia. Para o código-fonte, consulte [Programa completo: DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md).

O programa faz o seguinte:

1. Cria duas tabelas do DynamoDB chamadas `KCL-Demo-src` e `KCL-Demo-dst`. Cada uma dessas tabelas tem um fluxo habilitado.

1. Gera atividades de atualização na tabela de origem, adicionando, atualizando e excluindo itens. Isso faz com que os dados sejam gravados no fluxo da tabela.

1. Lê os registros do fluxo, faz a reconstrução desses registros como solicitações do DynamoDB e aplica essas solicitações à tabela de destino.

1. Verifica as tabelas de origem e destino para garantir que o conteúdo seja idêntico.

1. Realiza uma limpeza excluindo as tabelas.

Essas etapas estão descritas nas seções a seguir, e a aplicação completa é mostrada no final do passo-a-passo.

**Topics**
+ [Etapa 1: criar tabelas do DynamoDB](#Streams.KCLAdapter.Walkthrough.Step1)
+ [Etapa 2: gerar atividades de atualização na tabela de origem](#Streams.KCLAdapter.Walkthrough.Step2)
+ [Etapa 3: processar o fluxo](#Streams.KCLAdapter.Walkthrough.Step3)
+ [Etapa 4: garantir que as duas tabelas tenham conteúdo idêntico](#Streams.KCLAdapter.Walkthrough.Step4)
+ [Etapa 5: limpar](#Streams.KCLAdapter.Walkthrough.Step5)
+ [Programa completo: DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## Etapa 1: criar tabelas do DynamoDB
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

A primeira etapa é criar duas tabelas do DynamoDB: uma de origem e outra de destino. O `StreamViewType` no fluxo da tabela de origem é `NEW_IMAGE`. Isso significa que sempre que um item é modificado nessa tabela, o item “depois” da imagem é gravado no fluxo. Dessa forma, o fluxo mantém o controle de todas as atividades de gravação na tabela.

O exemplo a seguir mostra o código usado para a criação das duas tabelas.

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## Etapa 2: gerar atividades de atualização na tabela de origem
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

O próximo passo é gerar algumas atividades de gravação na tabela de origem. Enquanto essas atividades estão ocorrendo, o fluxo da tabela de origem também é atualizado quase em tempo real.

A aplicação define uma classe auxiliar com métodos que chamam as operações da API `PutItem`, `UpdateItem` e `DeleteItem` para gravar os dados. O exemplo de código a seguir mostra como esses métodos são usados.

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## Etapa 3: processar o fluxo
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

Agora, o programa inicia o processamento do fluxo. O DynamoDB Streams Kinesis Adapter atua como uma camada transparente entre a KCL e o endpoint do DynamoDB Streams para que o código possa usar totalmente a KCL em vez de precisar fazer chamadas de baixo nível ao DynamoDB Streams. O programa realiza as seguintes tarefas:
+ Ele define uma classe de processador de registro, `StreamsRecordProcessor`, com métodos que estão em conformidade com a definição de interface da KCL: `initialize`, `processRecords` e `shutdown`. O método `processRecords` contém a lógica necessária para leituras do fluxo da tabela de origem e para gravações na tabela de destino.
+ Ele define uma fábrica de classes para a classe de processador de registro (`StreamsRecordProcessorFactory`). Isso é necessário para programas Java que usam a KCL.
+ Ele instancia um novo `Worker` da KCL, que está associado à fábrica de classes.
+ Ele desliga `Worker` quando o processamento do registro é concluído.

Opcionalmente, habilite o modo de recuperação na configuração do adaptador da KCL do Streams para escalar automaticamente a taxa de chamadas de API GetRecords em três vezes (padrão) quando o atraso no processamento de fluxos exceder um minuto (padrão), o que ajuda o consumidor de fluxos a lidar com altos picos de throughput na tabela.

Para saber mais sobre a definição da interface da KCL, consulte [Desenvolvimento de consumidores usando a Amazon Kinesis Client Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) no *Guia do desenvolvedor do Amazon Kinesis Data Streams*. 

O exemplo de código a seguir mostra o loop principal em `StreamsRecordProcessor`. A instrução `case` determina a ação a ser executada, com base no `OperationType` que aparece no registro de fluxo.

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## Etapa 4: garantir que as duas tabelas tenham conteúdo idêntico
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

Neste ponto, o conteúdo das tabelas de origem e destino está sincronizado. A aplicação emite solicitações `Scan` em ambas as tabelas para verificar se o conteúdo delas é realmente idêntico.

A classe `DemoHelper` contém um método `ScanTable` que chama a API de `Scan` de baixo nível. O exemplo a seguir mostra como fazer isso.

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## Etapa 5: limpar
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

A demonstração está concluída e, portanto, a aplicação exclui as tabelas de origem e destino. Consulte o seguinte exemplo de código. Mesmo depois que as tabelas são excluídas, seus fluxos permanecem disponíveis por até 24 horas. Após esse período, eles serão automaticamente excluídos.

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```

# Programa completo: DynamoDB Streams Kinesis Adapter
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram"></a>

Veja a seguir o programa Java completo que realiza as tarefas descritas em [Demonstração: DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.md). Quando executá-lo, você verá uma saída semelhante à seguinte:

```
Creating table KCL-Demo-src
Creating table KCL-Demo-dest
Table is active.
Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601
Starting worker...
Scan result is equal.
Done.
```

**Importante**  
 Para executar esse programa, verifique se a aplicação cliente tem acesso ao DynamoDB e ao Amazon CloudWatch usando políticas. Para obter mais informações, consulte [Políticas baseadas em identidade para o DynamoDB](security_iam_service-with-iam.md#security_iam_service-with-iam-id-based-policies). 

O código-fonte consiste em quatro arquivos `.java`. Para criar esse programa, adicione a seguinte dependência, que inclui a Amazon Kinesis Client Library (KCL) 3.x e o AWS SDK para Java v2 como dependências temporárias:

------
#### [ Maven ]

```
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
    <version>2.1.0</version>
</dependency>
```

------
#### [ Gradle ]

```
implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:2.1.0'
```

------

Os arquivos de origem são:
+ `StreamsAdapterDemo.java`
+ `StreamsRecordProcessor.java`
+ `StreamsRecordProcessorFactory.java`
+ `StreamsAdapterDemoHelper.java`

## StreamsAdapterDemo.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemo"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;

public class StreamsAdapterDemo {

    private static DynamoDbAsyncClient dynamoDbAsyncClient;
    private static CloudWatchAsyncClient cloudWatchAsyncClient;
    private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient;

    private static String tablePrefix = "KCL-Demo";
    private static String streamArn;

    private static Region region = Region.US_EAST_1;
    private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();

    public static void main( String[] args ) throws Exception {
        System.out.println("Starting demo...");
        dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        cloudWatchAsyncClient = CloudWatchAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region);

        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";

        setUpTables();

        StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn,
                InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));

        ShardRecordProcessorFactory shardRecordProcessorFactory =
                new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable);

        ConfigsBuilder configsBuilder = new ConfigsBuilder(
                streamTracker,
                "streams-adapter-demo",
                amazonDynamoDbStreamsAdapterClient,
                dynamoDbAsyncClient,
                cloudWatchAsyncClient,
                "streams-demo-worker",
                shardRecordProcessorFactory
        );

        DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient);
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        retrievalConfig.retrievalSpecificConfig(pollingConfig);

        System.out.println("Creating scheduler for stream " + streamArn);
        Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                retrievalConfig,
                amazonDynamoDbStreamsAdapterClient
        );

        System.out.println("Starting scheduler...");
        Thread t = new Thread(scheduler);
        t.start();

        Thread.sleep(250000);

        System.out.println("Stopping scheduler...");
        scheduler.shutdown();
        t.join();

        if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items()
                .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) {
            System.out.println("Scan result is equal.");
        } else {
            System.out.println("Tables are different!");
        }

        System.out.println("Done.");
        cleanupAndExit(0);
    }

    private static void setUpTables() {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable);
        StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable);

        awaitTableCreation(srcTable);

        performOps(srcTable);
    }

    private static void awaitTableCreation(String tableName) {
        Integer retries = 0;
        Boolean created = false;
        while (!created && retries < 100) {
            DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName);
            created = result.table().tableStatusAsString().equals("ACTIVE");
            if (created) {
                System.out.println("Table is active.");
                return;
            } else {
                retries++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
        }
        System.out.println("Timeout after table creation. Exiting...");
        cleanupAndExit(1);
    }

    private static void performOps(String tableName) {
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101");
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102");
    }

    private static void cleanupAndExit(Integer returnValue) {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build());
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build());
        System.exit(returnValue);
    }
}
```

## StreamsRecordProcessor.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessor"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    private Integer checkpointCounter;

    private final DynamoDbAsyncClient dynamoDbAsyncClient;
    private final String tableName;

    public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) {
        this.dynamoDbAsyncClient = dynamoDbAsyncClient;
        this.tableName = tableName;
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.checkpointCounter = 0;
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) {
        for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) {
            String data = new String(record.data().array(), StandardCharsets.UTF_8);
            System.out.println(data);
            Record streamRecord = record.getRecord();

            switch (streamRecord.eventName()) {
                case INSERT:
                case MODIFY:
                    StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().newImage());
                case REMOVE:
                    StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().keys().get("Id").n());
            }
            checkpointCounter += 1;
            if (checkpointCounter % 10 == 0) {
                try {
                    dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        System.out.println("Lease Lost");
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }
}
```

## StreamsRecordProcessorFactory.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessorFactory"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory {
    private final String tableName;
    private final DynamoDbAsyncClient dynamoDbAsyncClient;

    public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) {
        this.tableName = tableName;
        this.dynamoDbAsyncClient = asyncClient;
    }

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName);
    }
}
```

## StreamsAdapterDemoHelper.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemoHelper"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamsAdapterDemoHelper {

    /**
     * @return StreamArn
     */
    public static String createTable(DynamoDbAsyncClient client, String tableName) {
        List<AttributeDefinition> attributeDefinitions = new ArrayList<>();
        attributeDefinitions.add(AttributeDefinition.builder()
                .attributeName("Id")
                .attributeType("N")
                .build());

        List<KeySchemaElement> keySchema = new ArrayList<>();
        keySchema.add(KeySchemaElement.builder()
                .attributeName("Id")
                .keyType(KeyType.HASH) // Partition key
                .build());

        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType(StreamViewType.NEW_IMAGE)
                .build();

        CreateTableRequest createTableRequest = CreateTableRequest.builder()
                .tableName(tableName)
                .attributeDefinitions(attributeDefinitions)
                .keySchema(keySchema)
                .billingMode(BillingMode.PAY_PER_REQUEST)
                .streamSpecification(streamSpecification)
                .build();

        try {
            System.out.println("Creating table " + tableName);
            CreateTableResponse result = client.createTable(createTableRequest).join();
            return result.tableDescription().latestStreamArn();
        } catch (Exception e) {
            if (e.getCause() instanceof ResourceInUseException) {
                System.out.println("Table already exists.");
                return describeTable(client, tableName).table().latestStreamArn();
            }
            throw e;
        }
    }

    public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) {
        return client.describeTable(DescribeTableRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) {
        return dynamoDbClient.scan(ScanRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put("Id", AttributeValue.builder().n(id).build());
        item.put("attribute-1", AttributeValue.builder().s(val).build());

        putItem(dynamoDbClient, tableName, item);
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName,
                               Map<String, AttributeValue> items) {
        PutItemRequest putItemRequest = PutItemRequest.builder()
                .tableName(tableName)
                .item(items)
                .build();
        dynamoDbClient.putItem(putItemRequest).join();
    }

    public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        Map<String, String> expressionAttributeNames = new HashMap<>();
        expressionAttributeNames.put("#attr2", "attribute-2");

        Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
        expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build());

        UpdateItemRequest updateItemRequest = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .updateExpression("SET #attr2 = :val")
                .expressionAttributeNames(expressionAttributeNames)
                .expressionAttributeValues(expressionAttributeValues)
                .build();

        dynamoDbClient.updateItem(updateItemRequest).join();
    }

    public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .build();
        dynamoDbClient.deleteItem(deleteItemRequest).join();
    }
}
```

# API de baixo nível do DynamoDB Streams: exemplo em Java
<a name="Streams.LowLevel.Walkthrough"></a>

**nota**  
O código nesta página não é completo e não trata todos os cenários de consumo do Amazon DynamoDB Streams. A maneira recomendada de consumir registros de stream do DynamoDB é por meio do Amazon Kinesis Adapter usando a Kinesis Client Library (KCL), conforme descrito em [Usar o adaptador do DynamoDB Streams Kinesis Adapter para processar registros de fluxos](Streams.KCLAdapter.md).

Esta seção contém um programa em Java que mostra o DynamoDB Streams em ação. O programa faz o seguinte:

1. Cria uma tabela do DynamoDB com um fluxo habilitado.

1. Descreve as configurações de fluxo dessa tabela.

1. Modifica os dados na tabela.

1. Descreve os fragmentos no fluxo.

1. Lê os registros de fluxo dos fragmentos.

1. Busca fragmentos filhos e continua lendo os registros.

1. Limpa.

Quando executar o programa, você verá um resultado semelhante ao seguinte:

```
Testing Streams Demo
Creating an Amazon DynamoDB table TestTableForStreams with a simple primary key: Id
Waiting for TestTableForStreams to be created...
Current stream ARN for TestTableForStreams: arn:aws:dynamodb:us-east-2:123456789012:table/TestTableForStreams/stream/2018-03-20T16:49:55.208
Stream enabled: true
Update view type: NEW_AND_OLD_IMAGES

Performing write activities on TestTableForStreams
Processing item 1 of 100
Processing item 2 of 100
Processing item 3 of 100
...
Processing item 100 of 100
Shard: {ShardId: shardId-1234567890-...,SequenceNumberRange: {StartingSequenceNumber: 100002572486797508907,},}
    Shard iterator: EjYFEkX2a26eVTWe...
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, NewImage={Message=AttributeValue(S=New Item!), Id=AttributeValue(S=4)}, SequenceNumber=2000001584047545833909, SizeBytes=22, StreamViewType=NEW_AND_OLD_IMAGES)
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, NewImage={Message=AttributeValue(S=This is an updated item), Id=AttributeValue(S=4)}, OldImage={Message=AttributeValue(S=New Item!), Id=AttributeValue(S=4)}, SequenceNumber=2100003604869767892701, SizeBytes=55, StreamViewType=NEW_AND_OLD_IMAGES)
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, OldImage={Message=AttributeValue(S=This is an updated item), Id=AttributeValue(S=4)}, SequenceNumber=2200001099771112898434, SizeBytes=36, StreamViewType=NEW_AND_OLD_IMAGES)
...
Deleting the table...
Table StreamsDemoTable deleted.
Demo complete
```

**Example Exemplo**  

```
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import software.amazon.awssdk.core.waiters.WaiterResponse;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeAction;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.Shard;
import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
import software.amazon.awssdk.services.dynamodb.model.ShardFilterType;
import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbWaiter;

public class StreamsLowLevelDemo {


    public static void main(String[] args) {
        final String usage = "Testing Streams Demo";
        try {
            System.out.println(usage);

            String tableName = "StreamsDemoTable";
            String key = "Id";
            System.out.println("Creating an Amazon DynamoDB table " + tableName + " with a simple primary key: " + key);
            Region region = Region.US_WEST_2;
            DynamoDbClient ddb = DynamoDbClient.builder()
                    .region(region)
                    .build();

            DynamoDbStreamsClient ddbStreams = DynamoDbStreamsClient.builder()
                    .region(region)
                    .build();
            DescribeTableRequest describeTableRequest = DescribeTableRequest.builder()
                    .tableName(tableName)
                    .build();
            TableDescription tableDescription = null;
            try{
                tableDescription = ddb.describeTable(describeTableRequest).table();
            }catch (Exception e){
                System.out.println("Table " + tableName + " does not exist.");
                tableDescription = createTable(ddb, tableName, key);
            }

            // Print the stream settings for the table
            String streamArn = tableDescription.latestStreamArn();
           
            StreamSpecification streamSpec = tableDescription.streamSpecification();
            System.out.println("Current stream ARN for " + tableDescription.tableName() + ": " +
                   streamArn);
            System.out.println("Stream enabled: " + streamSpec.streamEnabled());
            System.out.println("Update view type: " + streamSpec.streamViewType());
            System.out.println();
            // Generate write activity in the table
            System.out.println("Performing write activities on " + tableName);
            int maxItemCount = 100;
            for (Integer i = 1; i <= maxItemCount; i++) {
                System.out.println("Processing item " + i + " of " + maxItemCount);
                // Write a new item
                putItemInTable(key, i, tableName, ddb);
                // Update the item
                updateItemInTable(key, i, tableName, ddb);
                // Delete the item
                deleteDynamoDBItem(key, i, tableName, ddb);
            }

            // Process Stream
            processStream(streamArn, maxItemCount, ddb, ddbStreams, tableName);

            // Delete the table
            System.out.println("Deleting the table...");
            DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder()
                    .tableName(tableName)
                    .build();
            ddb.deleteTable(deleteTableRequest);
            System.out.println("Table " + tableName + " deleted.");
            System.out.println("Demo complete");
            ddb.close();
        } catch (Exception e) {
            System.out.println("Error: " + e.getMessage());
        }
    }

    private static void processStream(String streamArn, int maxItemCount, DynamoDbClient ddb, DynamoDbStreamsClient ddbStreams, String tableName) {
        // Get all the shard IDs from the stream. Note that DescribeStream returns
        // the shard IDs one page at a time.
        String lastEvaluatedShardId = null;
        do {
            DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
                    .streamArn(streamArn)
                    .exclusiveStartShardId(lastEvaluatedShardId).build();
            DescribeStreamResponse describeStreamResponse = ddbStreams.describeStream(describeStreamRequest);

            List<Shard> shards = describeStreamResponse.streamDescription().shards();

            // Process each shard on this page

            fetchShardsAndReadRecords(streamArn, maxItemCount, ddbStreams, shards);

            // If LastEvaluatedShardId is set, then there is
            // at least one more page of shard IDs to retrieve
            lastEvaluatedShardId = describeStreamResponse.streamDescription().lastEvaluatedShardId();

        } while (lastEvaluatedShardId != null);

    }

    private static void fetchShardsAndReadRecords(String streamArn, int maxItemCount, DynamoDbStreamsClient ddbStreams, List<Shard> shards) {
        for (Shard shard : shards) {
            String shardId = shard.shardId();
            System.out.println("Shard: " + shard);

            // Get an iterator for the current shard
            GetShardIteratorRequest shardIteratorRequest = GetShardIteratorRequest.builder()
                    .streamArn(streamArn).shardId(shardId)
                    .shardIteratorType(ShardIteratorType.TRIM_HORIZON).build();

            GetShardIteratorResponse getShardIteratorResult = ddbStreams.getShardIterator(shardIteratorRequest);

            String currentShardIter = getShardIteratorResult.shardIterator();

            // Shard iterator is not null until the Shard is sealed (marked as READ_ONLY).
            // To prevent running the loop until the Shard is sealed, we process only the
            // items that were written into DynamoDB and then exit.
            int processedRecordCount = 0;
            while (currentShardIter != null && processedRecordCount < maxItemCount) {
                // Use the shard iterator to read the stream records
                GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder()
                        .shardIterator(currentShardIter).build();
                GetRecordsResponse getRecordsResult = ddbStreams.getRecords(getRecordsRequest);
                List<Record> records = getRecordsResult.records();
                for (Record record : records) {
                    System.out.println("        " + record.dynamodb());
                }
                processedRecordCount += records.size();
                currentShardIter = getRecordsResult.nextShardIterator();
            }
            if (currentShardIter == null){
                System.out.println("Shard has been fully processed. Shard iterator is null.");
                System.out.println("Fetch the child shard to continue processing instead of bulk fetching all shards");
                DescribeStreamRequest describeStreamRequestForChildShards = DescribeStreamRequest.builder()
                        .streamArn(streamArn)
                        .shardFilter(ShardFilter.builder()
                                .type(ShardFilterType.CHILD_SHARDS)
                                .shardId(shardId).build())
                        .build();
                DescribeStreamResponse describeStreamResponseChildShards = ddbStreams.describeStream(describeStreamRequestForChildShards);
                fetchShardsAndReadRecords(streamArn, maxItemCount, ddbStreams, describeStreamResponseChildShards.streamDescription().shards());
            }
        }
    }

    private static void putItemInTable(String key, Integer i, String tableName, DynamoDbClient ddb) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());
        item.put("Message", AttributeValue.builder()
                .s("New Item!")
                .build());
        PutItemRequest request = PutItemRequest.builder()
                .tableName(tableName)
                .item(item)
                .build();
        ddb.putItem(request);
    }

    private static void updateItemInTable(String key, Integer i, String tableName, DynamoDbClient ddb) {

        HashMap<String, AttributeValue> itemKey = new HashMap<>();
        itemKey.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());


        HashMap<String, AttributeValueUpdate> updatedValues = new HashMap<>();
        updatedValues.put("Message", AttributeValueUpdate.builder()
                .value(AttributeValue.builder().s("This is an updated item").build())
                .action(AttributeAction.PUT)
                .build());

        UpdateItemRequest request = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(itemKey)
                .attributeUpdates(updatedValues)
                .build();
        ddb.updateItem(request);
    }

    public static void deleteDynamoDBItem(String key, Integer i, String tableName, DynamoDbClient ddb) {
        HashMap<String, AttributeValue> keyToGet = new HashMap<>();
        keyToGet.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());

        DeleteItemRequest deleteReq = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(keyToGet)
                .build();
        ddb.deleteItem(deleteReq);
    }

    public static TableDescription createTable(DynamoDbClient ddb, String tableName, String key) {
        DynamoDbWaiter dbWaiter = ddb.waiter();
        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType("NEW_AND_OLD_IMAGES")
                .build();
        CreateTableRequest request = CreateTableRequest.builder()
                .attributeDefinitions(AttributeDefinition.builder()
                        .attributeName(key)
                        .attributeType(ScalarAttributeType.S)
                        .build())
                .keySchema(KeySchemaElement.builder()
                        .attributeName(key)
                        .keyType(KeyType.HASH)
                        .build())
                .billingMode(BillingMode.PAY_PER_REQUEST) //  DynamoDB automatically scales based on traffic.
                .tableName(tableName)
                .streamSpecification(streamSpecification)
                .build();

        TableDescription newTable;
        try {
            CreateTableResponse response = ddb.createTable(request);
            DescribeTableRequest tableRequest = DescribeTableRequest.builder()
                    .tableName(tableName)
                    .build();
                    
            System.out.println("Waiting for " + tableName + " to be created...");

            // Wait until the Amazon DynamoDB table is created.
            WaiterResponse<DescribeTableResponse> waiterResponse = dbWaiter.waitUntilTableExists(tableRequest);
            waiterResponse.matched().response().ifPresent(System.out::println);
            newTable = response.tableDescription();
            return newTable;

        } catch (DynamoDbException e) {
            System.err.println(e.getMessage());
            System.exit(1);
        }
        return null;
    }



}
```

# DynamoDB Streams e acionadores do AWS Lambda
<a name="Streams.Lambda"></a>

O Amazon DynamoDB é integrado ao AWS Lambda para que você possa criar *acionadores* (trechos de código que respondem automaticamente a eventos no DynamoDB Streams). Com os acionadores, você pode criar aplicações que reagem às modificações de dados em tabelas do DynamoDB.

**Topics**
+ [Tutorial 1: Usar filtros para processar todos os eventos com o Amazon DynamoDB e o AWS Lambda usando a AWS CLI](Streams.Lambda.Tutorial.md)
+ [Tutorial 2: Usar filtros para processar alguns eventos com o DynamoDB e o Lambda](Streams.Lambda.Tutorial2.md)
+ [Práticas recomendadas de uso do DynamoDB Streams com o Lambda](Streams.Lambda.BestPracticesWithDynamoDB.md)

Se você habilitar o DynamoDB Streams em uma tabela, poderá associar o nome do recurso da Amazon (ARN) do fluxo a uma função do AWS Lambda escrita por você. Todas as ações de mutação nessa tabela do DynamoDB poderão então ser capturadas como um item no fluxo. Por exemplo, é possível definir um gatilho para que, quando um item em uma tabela for modificado, um novo registro apareça imediatamente no fluxo dessa tabela. 

**nota**  
Se você inscrever mais de duas funções do Lambda em um fluxo do DynamoDB, poderá ocorrer controle de utilização de leitura.

O serviço [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html) pesquisa o fluxo em busca de novos registros quatro vezes por segundo. Quando novos registros de fluxo estão disponíveis, a função do Lambda é invocada de maneira síncrona. É possível inscrever até duas funções do Lambda no mesmo fluxo do DynamoDB. Se você inscrever mais de duas funções do Lambda no mesmo fluxo do DynamoDB, poderá ocorrer controle de utilização de leitura.

A função do Lambda pode enviar uma notificação, iniciar uma workflow ou realizar qualquer outra ação especificada. É possível escrever uma função do Lambda para simplificar a cópia de cada registro de fluxo no armazenamento persistente, como o Gateway de Arquivos do Amazon S3 (Amazon S3), e criar uma trilha de auditoria permanente de atividades de gravação na tabela. Ou suponhamos que você tenha um aplicativo de jogos móveis que grava em uma tabela `GameScores`. Sempre que o atributo `TopScore` da tabela `GameScores` é atualizado, um registro de fluxo correspondente é gravado no fluxo da tabela. Este evento poderia, em seguida, acionar uma função do Lambda que posta uma mensagem de felicitações em uma rede de mídia social. Essa função também seria escrita para ignorar quaisquer registros de fluxo que não são atualizações para `GameScores` ou que não modificam o atributo `TopScore`.

Se a sua função retornar um erro, o Lambda tentará executar novamente o lote até que o processamento seja bem-sucedido ou os dados expirem. Você também pode configurar o Lambda para tentar novamente com um lote menor, limitar o número de tentativas, descartar registros quando eles se tornarem muito antigos e outras opções.

Como práticas recomendadas de performance, a função do Lambda precisa ser de curta duração. Para evitar a introdução de atrasos de processamento desnecessários, ela também não deve executar uma lógica complexa. Para um fluxo de alta velocidade em particular, é melhor acionar fluxos de trabalho assíncronos de função de etapa de pós-processamento do que Lambdas síncronos de longa execução.

 É possível usar acionadores do Lambda em diferentes contas da AWS configurando uma política baseada em recursos no fluxo do DynamoDB para conceder acesso de leitura entre contas à função do Lambda. Para saber mais sobre como configurar o fluxo para permitir acesso entre contas, consulte [Compartilhar acesso com funções do AWS Lambda entre contas](rbac-cross-account-access.md#shared-access-cross-acount-lambda) no “Guia do desenvolvedor do DynamoDB”.

Para obter mais informações sobre o AWS Lambda, consulte o [Guia do desenvolvedor do AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/).

# Tutorial 1: Usar filtros para processar todos os eventos com o Amazon DynamoDB e o AWS Lambda usando a AWS CLI
<a name="Streams.Lambda.Tutorial"></a>

 

Neste tutorial, você cria um acionador do AWS Lambda para processar um fluxo de uma tabela do DynamoDB.

**Topics**
+ [Etapa 1: criar uma tabela do DynamoDB com um fluxo habilitado](#Streams.Lambda.Tutorial.CreateTable)
+ [Etapa 2: criar uma função de execução do Lambda](#Streams.Lambda.Tutorial.CreateRole)
+ [Etapa 3: criar um tópico do Amazon SNS](#Streams.Lambda.Tutorial.SNSTopic)
+ [Etapa 4: criar e testar uma função do Lambda](#Streams.Lambda.Tutorial.LambdaFunction)
+ [Etapa 5: criar e testar um acionador](#Streams.Lambda.Tutorial.CreateTrigger)

O cenário deste tutorial é o Woofer, uma rede social simples. Os usuários do Woofer se comunicam usando *barks* (mensagens de texto curtas) que são enviados a outros usuários do Woofer. O diagrama a seguir mostra os componentes e o fluxo de trabalho desse aplicativo.

![\[Fluxo de trabalho da aplicação Woofer de uma tabela do DynamoDB, registro de fluxos, função do Lambda e tópico do Amazon SNS.\]](http://docs.aws.amazon.com/pt_br/amazondynamodb/latest/developerguide/images/StreamsAndTriggers.png)


1. Um usuário grava um item em uma tabela do DynamoDB (`BarkTable`). Cada item na tabela representa um bark.

1. Um novo registro de fluxo é gravado para refletir que um novo item foi adicionado à `BarkTable`.

1. O novo registro de fluxo aciona uma função do AWS Lambda (`publishNewBark`).

1. Se o registro de fluxo indicar que um novo item foi adicionado à `BarkTable`, a função do Lambda lerá os dados do registro de fluxo e publicará uma mensagem em um tópico no Amazon Simple Notification Service (Amazon SNS).

1. A mensagem é recebida pelos assinantes do tópico do Amazon SNS. (Neste tutorial, o único assinante é um endereço de e-mail.)

**Antes de começar**  
Este tutorial usa a AWS Command Line Interface AWS CLI. Se você ainda não tiver feito isso, siga as instruções de instalação no [Guia do usuário do AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/) para instalar e configurar a AWS CLI.

## Etapa 1: criar uma tabela do DynamoDB com um fluxo habilitado
<a name="Streams.Lambda.Tutorial.CreateTable"></a>

Nesta etapa, você cria uma tabela do DynamoDB (`BarkTable`) para armazenar todos os barks dos usuários do Woofer. A chave primária é composta de `Username` (chave de partição) e de `Timestamp` (chave de classificação). Ambos os atributos são do tipo string.

`BarkTable` tem um fluxo habilitado. Mais adiante neste tutorial, você criará um acionador associando uma função do AWS Lambda ao fluxo.

1. Use o seguinte comando para criar a tabela.

   ```
   aws dynamodb create-table \
       --table-name BarkTable \
       --attribute-definitions AttributeName=Username,AttributeType=S AttributeName=Timestamp,AttributeType=S \
       --key-schema AttributeName=Username,KeyType=HASH  AttributeName=Timestamp,KeyType=RANGE \
       --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
       --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
   ```

1. Na saída, procure o `LatestStreamArn`.

   ```
   ...
   "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp
   ...
   ```

   Anote a `region` e o `accountID`, pois eles serão necessários para as outras etapas deste tutorial.

## Etapa 2: criar uma função de execução do Lambda
<a name="Streams.Lambda.Tutorial.CreateRole"></a>

Nesta etapa, você cria uma função do AWS Identity and Access Management (IAM) (`WooferLambdaRole`) e atribui permissões a ela. Essa função será usada pela função do Lambda que você cria em [Etapa 4: criar e testar uma função do Lambda](#Streams.Lambda.Tutorial.LambdaFunction). 

Você também cria uma política para a função. A política contém todas as permissões de que a função do Lambda precisa em tempo de execução.

1. Crie um arquivo denominado `trust-relationship.json` com os conteúdos a seguir.

------
#### [ JSON ]

****  

   ```
   {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "lambda.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }
   ```

------

1. Insira o seguinte comando para criar a `WooferLambdaRole`.

   ```
   aws iam create-role --role-name WooferLambdaRole \
       --path "/service-role/" \
       --assume-role-policy-document file://trust-relationship.json
   ```

1. Crie um arquivo denominado `role-policy.json` com os conteúdos a seguir. (Substitua `region` e `accountID` por sua região e seu ID de conta da AWS.)

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Action": [
                   "logs:CreateLogGroup",
                   "logs:CreateLogStream",
                   "logs:PutLogEvents"
               ],
               "Resource": "arn:aws:logs:us-east-1:111122223333:*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "dynamodb:DescribeStream",
                   "dynamodb:GetRecords",
                   "dynamodb:GetShardIterator",
                   "dynamodb:ListStreams"
               ],
               "Resource": "arn:aws:dynamodb:us-east-1:111122223333:table/BarkTable/stream/*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "sns:Publish"
               ],
               "Resource": [
                   "*"
               ]
           }
       ]
   }
   ```

------

   A política tem quatro declarações que fornecem permissões à `WooferLambdaRole` para fazer o seguinte:
   + Execute uma função do Lambda (`publishNewBark`). Você cria a função mais adiante neste tutorial.
   + Acesse o Amazon CloudWatch Logs. A função do Lambda grava o diagnóstico no CloudWatch Logs em tempo de execução.
   + Leia os dados do fluxo do DynamoDB para `BarkTable`.
   + Publique mensagens no Amazon SNS.

1. Execute o seguinte comando para anexar a política à função `WooferLambdaRole`.

   ```
   aws iam put-role-policy --role-name WooferLambdaRole \
       --policy-name WooferLambdaRolePolicy \
       --policy-document file://role-policy.json
   ```

## Etapa 3: criar um tópico do Amazon SNS
<a name="Streams.Lambda.Tutorial.SNSTopic"></a>

Nesta etapa, você cria um tópico do Amazon SNS (`wooferTopic`) e inscreve um endereço de e-mail nele. A função do Lambda usa esse tópico para publicar novos barks de usuários do Woofer.

1. Digite o seguinte comando para criar um novo tópico do Amazon SNS.

   ```
   aws sns create-topic --name wooferTopic
   ```

1. Digite o seguinte comando para inscrever um endereço de e-mail no `wooferTopic`. (Substitua `region` e `accountID` por sua região e ID da conta da AWS e substitua `example@example.com` por um endereço de e-mail válido.)

   ```
   aws sns subscribe \
       --topic-arn arn:aws:sns:region:accountID:wooferTopic \
       --protocol email \
       --notification-endpoint example@example.com
   ```

1. O Amazon SNS envia uma mensagem de confirmação ao seu endereço de e-mail. Selecione o link **Confirm subscription** (Confirmar assinatura) na mensagem para concluir o processo de assinatura.

## Etapa 4: criar e testar uma função do Lambda
<a name="Streams.Lambda.Tutorial.LambdaFunction"></a>

Nesta etapa, você cria uma função do AWS Lambda (`publishNewBark`) para processar registros de fluxo da `BarkTable`.

A função `publishNewBark` processa apenas os eventos de fluxo que correspondem a novos itens na `BarkTable`. A função lê dados de um evento como esse e, em seguida, invoca o Amazon SNS; para publicá-lo.

1. Crie um arquivo denominado `publishNewBark.js` com os conteúdos a seguir. (Substitua `region` e `accountID` por sua região e seu ID de conta da AWS.)

   ```
   'use strict';
   var AWS = require("aws-sdk");
   var sns = new AWS.SNS();
   
   exports.handler = (event, context, callback) => {
   
       event.Records.forEach((record) => {
           console.log('Stream record: ', JSON.stringify(record, null, 2));
   
           if (record.eventName == 'INSERT') {
               var who = JSON.stringify(record.dynamodb.NewImage.Username.S);
               var when = JSON.stringify(record.dynamodb.NewImage.Timestamp.S);
               var what = JSON.stringify(record.dynamodb.NewImage.Message.S);
               var params = {
                   Subject: 'A new bark from ' + who,
                   Message: 'Woofer user ' + who + ' barked the following at ' + when + ':\n\n ' + what,
                   TopicArn: 'arn:aws:sns:region:accountID:wooferTopic'
               };
               sns.publish(params, function(err, data) {
                   if (err) {
                       console.error("Unable to send message. Error JSON:", JSON.stringify(err, null, 2));
                   } else {
                       console.log("Results from sending message: ", JSON.stringify(data, null, 2));
                   }
               });
           }
       });
       callback(null, `Successfully processed ${event.Records.length} records.`);
   };
   ```

1. Crie um arquivo zip para conter `publishNewBark.js`. Se você tiver o utilitário de linha de comando zip, poderá digitar o seguinte comando para fazer isso.

   ```
   zip publishNewBark.zip publishNewBark.js
   ```

1. Ao criar a função do Lambda, você especifica o nome do recurso da Amazon (ARN) da `WooferLambdaRole` que você criou em [Etapa 2: criar uma função de execução do Lambda](#Streams.Lambda.Tutorial.CreateRole). Digite o seguinte comando para recuperar o ARN.

   ```
   aws iam get-role --role-name WooferLambdaRole
   ```

   Na saída, procure o ARN da `WooferLambdaRole`.

   ```
   ...
   "Arn": "arn:aws:iam::region:role/service-role/WooferLambdaRole"
   ...
   ```

   Use o seguinte comando para criar a função do Lambda. Substitua *roleARN* pelo ARN da `WooferLambdaRole`.

   ```
   aws lambda create-function \
       --region region \
       --function-name publishNewBark \
       --zip-file fileb://publishNewBark.zip \
       --role roleARN \
       --handler publishNewBark.handler \
       --timeout 5 \
       --runtime nodejs16.x
   ```

1. Agora teste o `publishNewBark` para verificar se ele funciona. Para fazer isso, você deve fornecer informações semelhantes a um registro real do DynamoDB Streams.

   Crie um arquivo denominado `payload.json` com os conteúdos a seguir. Substitua `region` e `accountID` por sua Região da AWS e seu ID de conta.

   ```
   {
       "Records": [
           {
               "eventID": "7de3041dd709b024af6f29e4fa13d34c",
               "eventName": "INSERT",
               "eventVersion": "1.1",
               "eventSource": "aws:dynamodb",
               "awsRegion": "region",
               "dynamodb": {
                   "ApproximateCreationDateTime": 1479499740,
                   "Keys": {
                       "Timestamp": {
                           "S": "2016-11-18:12:09:36"
                       },
                       "Username": {
                           "S": "John Doe"
                       }
                   },
                   "NewImage": {
                       "Timestamp": {
                           "S": "2016-11-18:12:09:36"
                       },
                       "Message": {
                           "S": "This is a bark from the Woofer social network"
                       },
                       "Username": {
                           "S": "John Doe"
                       }
                   },
                   "SequenceNumber": "13021600000000001596893679",
                   "SizeBytes": 112,
                   "StreamViewType": "NEW_IMAGE"
               },
               "eventSourceARN": "arn:aws:dynamodb:region:account ID:table/BarkTable/stream/2016-11-16T20:42:48.104"
           }
       ]
   }
   ```

   Use o seguinte comando para testar a função `publishNewBark`.

   ```
   aws lambda invoke --function-name publishNewBark --payload file://payload.json --cli-binary-format raw-in-base64-out output.txt
   ```

   Se o teste for bem-sucedido, você verá a seguinte saída.

   ```
   {
       "StatusCode": 200,
       "ExecutedVersion": "$LATEST"
   }
   ```

   Além disso, o arquivo `output.txt` conterá o seguinte texto.

   ```
   "Successfully processed 1 records."
   ```

   Você também receberá uma nova mensagem de e-mail dentro de alguns minutos.
**nota**  
AWS LambdaO grava informações de diagnóstico no Amazon CloudWatch Logs. Se você encontrar erros em sua função do Lambda, poderá usar essas informações de diagnóstico para fins de solução de problemas:  
Abra o console do CloudWatch em [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).
No painel de navegação, selecione **Logs**.
Escolha o grupo de logs a seguir: `/aws/lambda/publishNewBark`
Escolha o fluxo de logs mais recente para visualizar a saída (e os erros) da função.

## Etapa 5: criar e testar um acionador
<a name="Streams.Lambda.Tutorial.CreateTrigger"></a>

Em [Etapa 4: criar e testar uma função do Lambda](#Streams.Lambda.Tutorial.LambdaFunction), você testou a função do Lambda para garantir que ela fosse executada corretamente. Nesta etapa, você cria um *acionador* associando a função do Lambda (`publishNewBark`) à origem de um evento (o fluxo `BarkTable`).

1. Ao criar o acionador, você deve especificar o ARN do fluxo de `BarkTable`. Digite o seguinte comando para recuperar o ARN.

   ```
   aws dynamodb describe-table --table-name BarkTable
   ```

   Na saída, procure o `LatestStreamArn`.

   ```
   ...
    "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp
   ...
   ```

1. Insira o seguinte comando para criar o acionador. Substitua `streamARN` pelo ARN do fluxo atual.

   ```
   aws lambda create-event-source-mapping \
       --region region \
       --function-name publishNewBark \
       --event-source streamARN  \
       --batch-size 1 \
       --starting-position TRIM_HORIZON
   ```

1. Teste o acionador. Insira o seguinte comando para adicionar um item a `BarkTable`.

   ```
   aws dynamodb put-item \
       --table-name BarkTable \
       --item Username={S="Jane Doe"},Timestamp={S="2016-11-18:14:32:17"},Message={S="Testing...1...2...3"}
   ```

   Você deve receber uma nova mensagem de e-mail dentro de alguns minutos.

1. Abra o console do DynamoDB e adicione mais alguns itens a `BarkTable`. Você deve especificar valores para os atributos `Username` e `Timestamp`. (Você também deve especificar um valor para `Message`, embora isso não seja obrigatório.) Você deve receber uma nova mensagem de e-mail para cada item que adicionar a `BarkTable`.

   A função do Lambda processa apenas novos itens que você adiciona a `BarkTable`. Se você atualizar ou excluir um item na tabela, a função não fará nada.

**nota**  
O AWS Lambda grava informações de diagnóstico no Amazon CloudWatch Logs. Se você encontrar erros em sua função do Lambda, poderá usar essas informações de diagnóstico para fins de solução de problemas.  
Abra o console do CloudWatch em [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).
No painel de navegação, selecione **Logs**.
Escolha o grupo de logs a seguir: `/aws/lambda/publishNewBark`
Escolha o fluxo de logs mais recente para visualizar a saída (e os erros) da função.

# Tutorial 2: Usar filtros para processar alguns eventos com o DynamoDB e o Lambda
<a name="Streams.Lambda.Tutorial2"></a>

Neste tutorial, você criará um acionador do AWS Lambda para processar somente alguns eventos em um fluxo de uma tabela do DynamoDB.

**Topics**
+ [Reunir todos os componentes: CloudFormation](#Streams.Lambda.Tutorial2.Cloudformation)
+ [Reunir todos os componentes: CDK](#Streams.Lambda.Tutorial2.CDK)

Com a [filtragem de eventos do Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html), é possível utilizar expressões de filtro para controlar quais eventos o Lambda enviará para a função processar. É possível configurar até cinco filtros diferentes por fluxo do DynamoDB. Se você estiver usando janelas em lotes, o Lambda aplicará os critérios de filtro a cada novo evento para determinar se deseja adicioná-lo ao lote atual.

Os filtros são aplicados por meio de estruturas chamadas `FilterCriteria`. Os três principais atributos de `FilterCriteria` são `metadata properties`, `data properties` e `filter patterns`. 

Aqui está um exemplo de estrutura de um evento do DynamoDB Streams:

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

Essas `metadata properties` são os campos do objeto do evento. No caso dos DynamoDB Streams, as `metadata properties` são campos como o `dynamodb` ou o `eventName`. 

Essas `data properties` são os campos do corpo do evento. Para filtrar as `data properties`, certifique-se de contê-las em `FilterCriteria` dentro da chave adequada. Para fontes de eventos do DynamoDB, a chave de dados é `NewImage` ou `OldImage`.

Por fim, as regras de filtro definirão a expressão de filtro que você deseja aplicar a uma propriedade específica. Veja alguns exemplos:


| Operador de comparação | Exemplo | Sintaxe da regra (parcial) | 
| --- | --- | --- | 
|  Nulo  |  O tipo de produto é nulo  |  `{ "product_type": { "S": null } } `  | 
|  Vazio  |  O nome do produto está vazio  |  `{ "product_name": { "S": [ ""] } } `  | 
|  Igual  |  O estado é igual a Flórida  |  `{ "state": { "S": ["FL"] } } `  | 
|  E  |  O estado do produto é igual à Flórida e a categoria do produto é Chocolate  |  `{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } } `  | 
|  Ou  |  O estado do produto é Flórida ou Califórnia  |  `{ "state": { "S": ["FL","CA"] } } `  | 
|  Não  |  O estado do produto não é Flórida  |  `{"state": {"S": [{"anything-but": ["FL"]}]}}`  | 
|  Existe  |  O produto caseiro existe  |  `{"homemade": {"S": [{"exists": true}]}}`  | 
|  Não existe  |  O produto Homemade não existe  |  `{"homemade": {"S": [{"exists": false}]}}`  | 
|  Começa com  |  PK começa com COMPANY  |  `{"PK": {"S": [{"prefix": "COMPANY"}]}}`  | 

É possível especificar até cinco padrões de filtragem de eventos em uma função do Lambda. Observe que cada um desses cinco eventos será avaliado como um OR lógico. Então, se você configurar dois filtros chamados `Filter_One` e `Filter_Two`, a função do Lambda executará `Filter_One` OU `Filter_Two`.

**nota**  
Na página de [filtragem de eventos do Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html), há algumas opções para filtrar e comparar valores numéricos. No entanto, no caso de eventos de filtro do DynamoDB, isso não se aplica porque os números no DynamoDB são armazenados como strings. Por exemplo ` "quantity": { "N": "50" }`, sabemos que é um número por causa da propriedade `"N"`.

## Reunir todos os componentes: CloudFormation
<a name="Streams.Lambda.Tutorial2.Cloudformation"></a>

Para mostrar a funcionalidade de filtragem de eventos na prática, aqui está um exemplo de modelo do CloudFormation. Esse modelo gerará uma tabela simples do DynamoDB com uma chave de partição PK e uma chave de classificação SK com o Amazon DynamoDB Streams habilitado. Ele criará uma função do Lambda e uma função simples de execução do Lambda que permitirá gravar logs no Amazon Cloudwatch e ler os eventos do Amazon DynamoDB Stream. Ele também adicionará o mapeamento da origem do evento entre os DynamoDB Streams e a função do Lambda, para que a função possa ser executada sempre que houver um evento no Amazon DynamoDB Streams.

```
AWSTemplateFormatVersion: "2010-09-09"

Description: Sample application that presents AWS Lambda event source filtering 
with Amazon DynamoDB Streams.

Resources:
  StreamsSampleDDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: "PK"
          AttributeType: "S"
        - AttributeName: "SK"
          AttributeType: "S"
      KeySchema:
        - AttributeName: "PK"
          KeyType: "HASH"
        - AttributeName: "SK"
          KeyType: "RANGE"
      StreamSpecification:
        StreamViewType: "NEW_AND_OLD_IMAGES"
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 5

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17",		 	 	 
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: "/"
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: "2012-10-17",		 	 	 
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: arn:aws:logs:*:*:*
              - Effect: Allow
                Action:
                  - dynamodb:DescribeStream
                  - dynamodb:GetRecords
                  - dynamodb:GetShardIterator
                  - dynamodb:ListStreams
                Resource: !GetAtt StreamsSampleDDBTable.StreamArn

  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST

  ProcessEventLambda:
    Type: AWS::Lambda::Function
    Properties:
      Runtime: python3.7
      Timeout: 300
      Handler: index.handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Code:
        ZipFile: |
          import logging

          LOGGER = logging.getLogger()
          LOGGER.setLevel(logging.INFO)

          def handler(event, context):
            LOGGER.info('Received Event: %s', event)
            for rec in event['Records']:
              LOGGER.info('Record: %s', rec)

Outputs:
  StreamsSampleDDBTable:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.Arn
  StreamARN:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.StreamArn
```

Depois de implantar esse modelo de formação de nuvem, é possível inserir o seguinte item do Amazon DynamoDB:

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

Graças à função simples do Lambda incluída em linha nesse modelo de formação de nuvem, você verá os eventos nos grupos de logs do Amazon CloudWatch para a função do Lambda da seguinte forma:

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

**Exemplos de filtragem**
+ **Somente produtos que correspondam a um determinado estado**

Este exemplo modifica o modelo do CloudFormation para incluir um filtro que corresponda a todos os produtos provenientes da Flórida, com a abreviatura “FL”.

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Depois de reimplantar a pilha, é possível adicionar o seguinte item do DynamoDB à tabela. Observe que ele não aparecerá nos logs de funções do Lambda, porque o produto neste exemplo é da Califórnia.

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK#1000",
 "company_id": "1000",
 "fabric": "Florida Chocolates",
 "price": 15,
 "product_id": "1000",
 "quantity": 50,
 "state": "CA",
 "stores": 5,
 "type": ""
}
```
+ **Somente os itens que começam com alguns valores em PK e SK**

Este exemplo modifica o modelo do CloudFormation para incluir a seguinte condição:

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Observe que a condição AND exige que a condição esteja dentro do padrão, onde as chaves PK e SK estão na mesma expressão separadas por vírgula.

Comece com alguns valores em PK e SK ou de determinado estado.

Este exemplo modifica o modelo do CloudFormation para incluir as seguintes condições:

```
  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Observe que a condição OR é adicionada introduzindo novos padrões na seção de filtro.

## Reunir todos os componentes: CDK
<a name="Streams.Lambda.Tutorial2.CDK"></a>

O exemplo de modelo de formação de projeto CDK a seguir mostra a funcionalidade de filtragem de eventos. Antes de trabalhar com esse projeto de CDK, será preciso [instalar os pré-requisitos](https://docs.aws.amazon.com/cdk/v2/guide/work-with.html), incluindo a [execução de scripts de preparação](https://docs.aws.amazon.com/cdk/v2/guide/work-with-cdk-python.html).

**Criar um projeto de CDK**

Primeiro, crie um novo projeto do AWS CDK, invocando `cdk init` em um diretório vazio.

```
mkdir ddb_filters
cd ddb_filters
cdk init app --language python
```

O comando `cdk init` usa o nome da pasta do projeto para nomear vários elementos do projeto, incluindo classes, subpastas e arquivos. Todos os hifens no nome da pasta são convertidos em sublinhados. Caso contrário, o nome deve seguir a forma de um identificador Python. Por exemplo, ele não deve começar com um número nem conter espaços.

Para trabalhar com o novo projeto, ative o respectivo ambiente virtual. Isso permite que as dependências do projeto sejam instaladas localmente na pasta do projeto, em vez de globalmente.

```
source .venv/bin/activate
python -m pip install -r requirements.txt
```

**nota**  
É possível reconhecer isso como o comando Mac/Linux para ativar um ambiente virtual. Os modelos do Python incluem um arquivo em lote, `source.bat`, que permite que o mesmo comando seja utilizado no Windows. O comando tradicional do Windows `.venv\Scripts\activate.bat` também funciona. Se você inicializou seu projeto do AWS CDK usando o AWS CDK Toolkit v1.70.0 ou anterior, seu ambiente virtual está no diretório `.env` em vez de `.venv`. 

**Infraestrutura base**

Abra o arquivo `./ddb_filters/ddb_filters_stack.py` com o editor de texto de sua preferência. Esse arquivo foi gerado automaticamente quando você criou o projeto do AWS CDK. 

Em seguida, adicione as funções `_create_ddb_table` e `_set_ddb_trigger_function`. Essas funções criarão uma tabela do DynamoDB com a chave de partição PK e a chave de classificação SK no modo de provisionamento sob demanda, com o Amazon DynamoDB Streams habilitado por padrão para mostrar imagens novas e antigas.

A função do Lambda será armazenada na pasta `lambda` abaixo do arquivo `app.py`. Esse arquivo será criado posteriormente. Ele incluirá uma variável de ambiente `APP_TABLE_NAME`, que será o nome da tabela do Amazon DynamoDB criada por essa pilha. Na mesma função, concederemos permissões de leitura de fluxo para a função do Lambda. Por fim, ele se inscreverá no DynamoDB Streams como fonte de eventos para a função do Lambda. 

No final do arquivo no método `__init__`, você chamará as respectivas estruturas para inicializá-las na pilha. Para projetos maiores que exigem componentes e serviços adicionais, talvez seja melhor definir essas estruturas fora da pilha base. 

```
import os
import json

import aws_cdk as cdk
from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_dynamodb as dynamodb,
)
from constructs import Construct


class DdbFiltersStack(Stack):

    def _create_ddb_table(self):
        dynamodb_table = dynamodb.Table(
            self,
            "AppTable",
            partition_key=dynamodb.Attribute(
                name="PK", type=dynamodb.AttributeType.STRING
            ),
            sort_key=dynamodb.Attribute(
                name="SK", type=dynamodb.AttributeType.STRING),
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
            stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
            removal_policy=cdk.RemovalPolicy.DESTROY,
        )

        cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name)
        return dynamodb_table

    def _set_ddb_trigger_function(self, ddb_table):
        events_lambda = _lambda.Function(
            self,
            "LambdaHandler",
            runtime=_lambda.Runtime.PYTHON_3_9,
            code=_lambda.Code.from_asset("lambda"),
            handler="app.handler",
            environment={
                "APP_TABLE_NAME": ddb_table.table_name,
            },
        )

        ddb_table.grant_stream_read(events_lambda)

        event_subscription = _lambda.CfnEventSourceMapping(
            scope=self,
            id="companyInsertsOnlyEventSourceMapping",
            function_name=events_lambda.function_name,
            event_source_arn=ddb_table.table_stream_arn,
            maximum_batching_window_in_seconds=1,
            starting_position="LATEST",
            batch_size=1,
        )

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        ddb_table = self._create_ddb_table()
        self._set_ddb_trigger_function(ddb_table)
```

Agora, criaremos uma função do Lambda muito simples que imprimirá os logs no Amazon CloudWatch. Para fazer isso, crie uma pasta chamada `lambda`.

```
mkdir lambda
touch app.py
```

Usando o editor de texto de sua preferência, adicione o seguinte conteúdo ao arquivo `app.py`:

```
import logging

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)


def handler(event, context):
    LOGGER.info('Received Event: %s', event)
    for rec in event['Records']:
        LOGGER.info('Record: %s', rec)
```

Garantindo que você esteja na pasta `/ddb_filters/`, digite o seguinte comando para criar a aplicação de exemplo:

```
cdk deploy
```

Em algum momento, você deverá confirmar se deseja implantar a solução. Aceite as alterações digitando `Y`.

```
├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤
│ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │
└───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘

Do you wish to deploy these changes (y/n)? y

...

✨  Deployment time: 67.73s

Outputs:
DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP
Stack ARN:
arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6
```

Depois que as alterações forem implantadas, abra o console da AWS e adicione um item à tabela. 

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

Os logs do CloudWatch agora devem conter todas as informações dessa entrada. 

**Exemplos de filtragem**
+ **Somente produtos que correspondam a um determinado estado**

Abra o arquivo `ddb_filters/ddb_filters/ddb_filters_stack.py` e modifique-o para incluir o filtro que corresponde a todos os produtos que são iguais a “FL”. Isso pode ser revisado logo abaixo de `event_subscription` na linha 45.

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```
+ **Somente os itens que começam com alguns valores em PK e SK**

Modifique o script Python para incluir a seguinte condição:

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
        ]
    },
```
+ **Comece com alguns valores em PK e SK ou de determinado estado.**

Modifique o script Python para incluir as seguintes condições:

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```

Observe que a condição OR é adicionada incluindo mais elementos à matriz Filters (Filtros).

**Limpeza**

Localize a pilha de filtros na base do diretório de trabalho e execute `cdk destroy`. Confirme a exclusão do recurso:

```
cdk destroy
Are you sure you want to delete: DdbFiltersStack (y/n)? y
```

# Práticas recomendadas de uso do DynamoDB Streams com o Lambda
<a name="Streams.Lambda.BestPracticesWithDynamoDB"></a>

Uma função do AWS Lambda é executada em um *contêiner*, um ambiente de execução isolado de outras funções. Quando você executa uma função pela primeira vez, o AWS Lambda cria um novo contêiner e começa a executar o código da função.

Uma função do Lambda tem um *manipulador* que é executado uma vez por invocação. O manipulador contém a lógica de negócios principal da função. Por exemplo, a função do Lambda mostrada em [Etapa 4: criar e testar uma função do Lambda](Streams.Lambda.Tutorial.md#Streams.Lambda.Tutorial.LambdaFunction) tem um identificador que pode processar registros em um fluxo do DynamoDB. 

Você também pode fornecer o código de inicialização que é executado apenas uma vez: depois que o contêiner é criado, mas antes que o AWS Lambda execute o manipulador pela primeira vez. A função do Lambda mostrada em [Etapa 4: criar e testar uma função do Lambda](Streams.Lambda.Tutorial.md#Streams.Lambda.Tutorial.LambdaFunction) possui um código de inicialização que importa o SDK for JavaScript in Node.js e cria um cliente para o Amazon SNS. Esses objetos devem ser definidos somente uma vez, fora do manipulador.

Depois da execução da função, o AWS Lambda pode optar por reutilizar o contêiner para invocações subsequentes da função. Neste caso, o manipulador da função pode reutilizar os recursos que você definiu no seu código de inicialização. (Você não pode controlar por quanto tempo o AWS Lambda reterá o contêiner, ou se o contêiner será reutilizado.)

Para acionadores do DynamoDB que usam o AWS Lambda, recomendamos o seguinte:
+ AWSOs clientes de serviço da devem ser instanciados no código de inicialização, e não no manipulador. Isso permite que o AWS Lambda reutilize conexões existentes, durante o ciclo de vida do contêiner.
+ Em geral, você não precisa gerenciar explicitamente as conexões ou implementar o pool de conexões porque o AWS Lambda gerencia isso para você.

Um consumidor do Lambda para um fluxo do DynamoDB não garante entrega exatamente uma vez, podendo resultar em duplicações ocasionais. Verifique se o código da função do Lambda é idempotente para evitar que problemas inesperados ocorram devido ao processamento de duplicações.

Para obter mais informações, consulte [Práticas recomendadas para trabalhar com funções do AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/best-practices.html) no *Guia do desenvolvedor do AWS Lambda*.

# DynamoDB Streams e Apache Flink
<a name="StreamsApacheFlink.xml"></a>

É possível consumir registros do Amazon DynamoDB Streams com o Apache Flink. Com o [Amazon Managed Service for Apache Flink](https://aws.amazon.com/managed-service-apache-flink/), é possível transformar e analisar dados de streaming em tempo real usando o Apache Flink. O Apache Flink é um framework de processamento de fluxos de código aberto para processar dados em tempo real. O conector do Amazon DynamoDB Streams para Apache Flink simplifica a criação e o gerenciamento de workloads do Apache Flink e permite que você integre aplicações com outros Serviços da AWS.

O Amazon Managed Service for Apache Flink ajuda você a criar rapidamente aplicações de processamento de fluxos de ponta a ponta para analytics de logs, analytics de clickstream, Internet das Coisas (IoT), tecnologia de anúncios, jogos e muito mais. Os quatro casos de uso mais comuns são extração, transformação e carregamento (ETL) de streaming, aplicações orientadas a eventos, analytics responsivas em tempo real e consultas interativas de fluxos de dados. Para ter mais informações sobre como gravar do Amazon DynamoDB Streams no Apache Flink, consulte [Amazon DynamoDB Streams Connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/dynamodb/).