Demonstração: DynamoDB Streams Kinesis Adapter - Amazon DynamoDB

Demonstração: DynamoDB Streams Kinesis Adapter

Esta seção é uma demonstração de uma aplicação em Java que usa a Amazon Kinesis Client Library e o Amazon DynamoDB Streams Kinesis Adapter. A aplicação mostra um exemplo de replicação de dados, no qual as atividades de gravação de uma tabela são aplicadas a uma segunda tabela, com o conteúdo de ambas mantido em sincronia. Para o código-fonte, consulte Programa completo: DynamoDB Streams Kinesis Adapter.

O programa faz o seguinte:

  1. Cria duas tabelas do DynamoDB chamadas KCL-Demo-src e KCL-Demo-dst. Cada uma dessas tabelas tem um fluxo habilitado.

  2. Gera atividades de atualização na tabela de origem, adicionando, atualizando e excluindo itens. Isso faz com que os dados sejam gravados no fluxo da tabela.

  3. Lê os registros do fluxo, faz a reconstrução desses registros como solicitações do DynamoDB e aplica essas solicitações à tabela de destino.

  4. Verifica as tabelas de origem e destino para garantir que o conteúdo seja idêntico.

  5. Realiza uma limpeza excluindo as tabelas.

Essas etapas estão descritas nas seções a seguir, e a aplicação completa é mostrada no final do passo-a-passo.

Etapa 1: criar tabelas do DynamoDB

A primeira etapa é criar duas tabelas do DynamoDB: uma de origem e outra de destino. O StreamViewType no fluxo da tabela de origem é NEW_IMAGE. Isso significa que sempre que um item é modificado nessa tabela, o item “depois” da imagem é gravado no fluxo. Dessa forma, o fluxo mantém o controle de todas as atividades de gravação na tabela.

O exemplo a seguir mostra o código usado para a criação das duas tabelas.

java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition // key ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L) .withWriteCapacityUnits(2L); StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE); CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName) .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema) .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);

Etapa 2: gerar atividades de atualização na tabela de origem

O próximo passo é gerar algumas atividades de gravação na tabela de origem. Enquanto essas atividades estão ocorrendo, o fluxo da tabela de origem também é atualizado quase em tempo real.

A aplicação define uma classe auxiliar com métodos que chamam as operações da API PutItem, UpdateItem e DeleteItem para gravar os dados. O exemplo de código a seguir mostra como esses métodos são usados.

StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");

Etapa 3: processar o fluxo

Agora, o programa inicia o processamento do fluxo. O DynamoDB Streams Kinesis Adapter atua como uma camada transparente entre a KCL e o endpoint do DynamoDB Streams para que o código possa usar totalmente a KCL em vez de precisar fazer chamadas de baixo nível ao DynamoDB Streams. O programa realiza as seguintes tarefas:

  • Ele define uma classe de processador de registro, StreamsRecordProcessor, com métodos que estão em conformidade com a definição de interface da KCL: initialize, processRecords e shutdown. O método processRecords contém a lógica necessária para leituras do fluxo da tabela de origem e para gravações na tabela de destino.

  • Ele define uma fábrica de classes para a classe de processador de registro (StreamsRecordProcessorFactory). Isso é necessário para programas Java que usam a KCL.

  • Ele instancia um novo Worker da KCL, que está associado à fábrica de classes.

  • Ele desliga Worker quando o processamento do registro é concluído.

Para saber mais sobre a definição da interface da KCL, consulte Desenvolvimento de consumidores usando a Amazon Kinesis Client Library no Guia do desenvolvedor do Amazon Kinesis Data Streams.

O exemplo de código a seguir mostra o loop principal em StreamsRecordProcessor. A instrução case determina a ação a ser executada, com base no OperationType que aparece no registro de fluxo.

for (Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record) .getInternalObject(); switch (streamRecord.getEventName()) { case "INSERT": case "MODIFY": StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getNewImage()); break; case "REMOVE": StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getKeys().get("Id").getN()); } } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } }

Etapa 4: garantir que as duas tabelas tenham conteúdo idêntico

Neste ponto, o conteúdo das tabelas de origem e destino está sincronizado. A aplicação emite solicitações Scan em ambas as tabelas para verificar se o conteúdo delas é realmente idêntico.

A classe DemoHelper contém um método ScanTable que chama a API de Scan de baixo nível. O exemplo a seguir mostra como fazer isso.

if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); }

Etapa 5: limpar

A demonstração está concluída e, portanto, a aplicação exclui as tabelas de origem e destino. Consulte o seguinte exemplo de código. Mesmo depois que as tabelas são excluídas, seus fluxos permanecem disponíveis por até 24 horas. Após esse período, eles serão automaticamente excluídos.

dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable)); dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));