

# Demonstração: DynamoDB Streams Kinesis Adapter
<a name="Streams.KCLAdapter.Walkthrough"></a>

Esta seção é uma demonstração de uma aplicação em Java que usa a Amazon Kinesis Client Library e o Amazon DynamoDB Streams Kinesis Adapter. A aplicação mostra um exemplo de replicação de dados, no qual as atividades de gravação de uma tabela são aplicadas a uma segunda tabela, com o conteúdo de ambas mantido em sincronia. Para o código-fonte, consulte [Programa completo: DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md).

O programa faz o seguinte:

1. Cria duas tabelas do DynamoDB chamadas `KCL-Demo-src` e `KCL-Demo-dst`. Cada uma dessas tabelas tem um fluxo habilitado.

1. Gera atividades de atualização na tabela de origem, adicionando, atualizando e excluindo itens. Isso faz com que os dados sejam gravados no fluxo da tabela.

1. Lê os registros do fluxo, faz a reconstrução desses registros como solicitações do DynamoDB e aplica essas solicitações à tabela de destino.

1. Verifica as tabelas de origem e destino para garantir que o conteúdo seja idêntico.

1. Realiza uma limpeza excluindo as tabelas.

Essas etapas estão descritas nas seções a seguir, e a aplicação completa é mostrada no final do passo-a-passo.

**Topics**
+ [Etapa 1: criar tabelas do DynamoDB](#Streams.KCLAdapter.Walkthrough.Step1)
+ [Etapa 2: gerar atividades de atualização na tabela de origem](#Streams.KCLAdapter.Walkthrough.Step2)
+ [Etapa 3: processar o fluxo](#Streams.KCLAdapter.Walkthrough.Step3)
+ [Etapa 4: garantir que as duas tabelas tenham conteúdo idêntico](#Streams.KCLAdapter.Walkthrough.Step4)
+ [Etapa 5: limpar](#Streams.KCLAdapter.Walkthrough.Step5)
+ [Programa completo: DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## Etapa 1: criar tabelas do DynamoDB
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

A primeira etapa é criar duas tabelas do DynamoDB: uma de origem e outra de destino. O `StreamViewType` no fluxo da tabela de origem é `NEW_IMAGE`. Isso significa que sempre que um item é modificado nessa tabela, o item “depois” da imagem é gravado no fluxo. Dessa forma, o fluxo mantém o controle de todas as atividades de gravação na tabela.

O exemplo a seguir mostra o código usado para a criação das duas tabelas.

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## Etapa 2: gerar atividades de atualização na tabela de origem
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

O próximo passo é gerar algumas atividades de gravação na tabela de origem. Enquanto essas atividades estão ocorrendo, o fluxo da tabela de origem também é atualizado quase em tempo real.

A aplicação define uma classe auxiliar com métodos que chamam as operações da API `PutItem`, `UpdateItem` e `DeleteItem` para gravar os dados. O exemplo de código a seguir mostra como esses métodos são usados.

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## Etapa 3: processar o fluxo
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

Agora, o programa inicia o processamento do fluxo. O DynamoDB Streams Kinesis Adapter atua como uma camada transparente entre a KCL e o endpoint do DynamoDB Streams para que o código possa usar totalmente a KCL em vez de precisar fazer chamadas de baixo nível ao DynamoDB Streams. O programa realiza as seguintes tarefas:
+ Ele define uma classe de processador de registro, `StreamsRecordProcessor`, com métodos que estão em conformidade com a definição de interface da KCL: `initialize`, `processRecords` e `shutdown`. O método `processRecords` contém a lógica necessária para leituras do fluxo da tabela de origem e para gravações na tabela de destino.
+ Ele define uma fábrica de classes para a classe de processador de registro (`StreamsRecordProcessorFactory`). Isso é necessário para programas Java que usam a KCL.
+ Ele instancia um novo `Worker` da KCL, que está associado à fábrica de classes.
+ Ele desliga `Worker` quando o processamento do registro é concluído.

Opcionalmente, habilite o modo de recuperação na configuração do adaptador da KCL do Streams para escalar automaticamente a taxa de chamadas de API GetRecords em três vezes (padrão) quando o atraso no processamento de fluxos exceder um minuto (padrão), o que ajuda o consumidor de fluxos a lidar com altos picos de throughput na tabela.

Para saber mais sobre a definição da interface da KCL, consulte [Desenvolvimento de consumidores usando a Amazon Kinesis Client Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) no *Guia do desenvolvedor do Amazon Kinesis Data Streams*. 

O exemplo de código a seguir mostra o loop principal em `StreamsRecordProcessor`. A instrução `case` determina a ação a ser executada, com base no `OperationType` que aparece no registro de fluxo.

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## Etapa 4: garantir que as duas tabelas tenham conteúdo idêntico
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

Neste ponto, o conteúdo das tabelas de origem e destino está sincronizado. A aplicação emite solicitações `Scan` em ambas as tabelas para verificar se o conteúdo delas é realmente idêntico.

A classe `DemoHelper` contém um método `ScanTable` que chama a API de `Scan` de baixo nível. O exemplo a seguir mostra como fazer isso.

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## Etapa 5: limpar
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

A demonstração está concluída e, portanto, a aplicação exclui as tabelas de origem e destino. Consulte o seguinte exemplo de código. Mesmo depois que as tabelas são excluídas, seus fluxos permanecem disponíveis por até 24 horas. Após esse período, eles serão automaticamente excluídos.

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```