Demonstração: DynamoDB Streams Kinesis Adapter
Esta seção é uma demonstração de uma aplicação em Java que usa a Amazon Kinesis Client Library e o Amazon DynamoDB Streams Kinesis Adapter. A aplicação mostra um exemplo de replicação de dados, no qual as atividades de gravação de uma tabela são aplicadas a uma segunda tabela, com o conteúdo de ambas mantido em sincronia. Para o código-fonte, consulte Programa completo: DynamoDB Streams Kinesis Adapter.
O programa faz o seguinte:
-
Cria duas tabelas do DynamoDB chamadas
KCL-Demo-src
eKCL-Demo-dst
. Cada uma dessas tabelas tem um fluxo habilitado. -
Gera atividades de atualização na tabela de origem, adicionando, atualizando e excluindo itens. Isso faz com que os dados sejam gravados no fluxo da tabela.
-
Lê os registros do fluxo, faz a reconstrução desses registros como solicitações do DynamoDB e aplica essas solicitações à tabela de destino.
-
Verifica as tabelas de origem e destino para garantir que o conteúdo seja idêntico.
-
Realiza uma limpeza excluindo as tabelas.
Essas etapas estão descritas nas seções a seguir, e a aplicação completa é mostrada no final do passo-a-passo.
Tópicos
Etapa 1: criar tabelas do DynamoDB
A primeira etapa é criar duas tabelas do DynamoDB: uma de origem e outra de destino. O StreamViewType
no fluxo da tabela de origem é NEW_IMAGE
. Isso significa que sempre que um item é modificado nessa tabela, o item “depois” da imagem é gravado no fluxo. Dessa forma, o fluxo mantém o controle de todas as atividades de gravação na tabela.
O exemplo a seguir mostra o código usado para a criação das duas tabelas.
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition // key ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L) .withWriteCapacityUnits(2L); StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE); CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName) .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema) .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
Etapa 2: gerar atividades de atualização na tabela de origem
O próximo passo é gerar algumas atividades de gravação na tabela de origem. Enquanto essas atividades estão ocorrendo, o fluxo da tabela de origem também é atualizado quase em tempo real.
A aplicação define uma classe auxiliar com métodos que chamam as operações da API PutItem
, UpdateItem
e DeleteItem
para gravar os dados. O exemplo de código a seguir mostra como esses métodos são usados.
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
Etapa 3: processar o fluxo
Agora, o programa inicia o processamento do fluxo. O DynamoDB Streams Kinesis Adapter atua como uma camada transparente entre a KCL e o endpoint do DynamoDB Streams para que o código possa usar totalmente a KCL em vez de precisar fazer chamadas de baixo nível ao DynamoDB Streams. O programa realiza as seguintes tarefas:
-
Ele define uma classe de processador de registro,
StreamsRecordProcessor
, com métodos que estão em conformidade com a definição de interface da KCL:initialize
,processRecords
eshutdown
. O métodoprocessRecords
contém a lógica necessária para leituras do fluxo da tabela de origem e para gravações na tabela de destino. -
Ele define uma fábrica de classes para a classe de processador de registro (
StreamsRecordProcessorFactory
). Isso é necessário para programas Java que usam a KCL. -
Ele instancia um novo
Worker
da KCL, que está associado à fábrica de classes. -
Ele desliga
Worker
quando o processamento do registro é concluído.
Para saber mais sobre a definição da interface da KCL, consulte Desenvolvimento de consumidores usando a Amazon Kinesis Client Library no Guia do desenvolvedor do Amazon Kinesis Data Streams.
O exemplo de código a seguir mostra o loop principal em StreamsRecordProcessor
. A instrução case
determina a ação a ser executada, com base no OperationType
que aparece no registro de fluxo.
for (Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record) .getInternalObject(); switch (streamRecord.getEventName()) { case "INSERT": case "MODIFY": StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getNewImage()); break; case "REMOVE": StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getKeys().get("Id").getN()); } } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } }
Etapa 4: garantir que as duas tabelas tenham conteúdo idêntico
Neste ponto, o conteúdo das tabelas de origem e destino está sincronizado. A aplicação emite solicitações Scan
em ambas as tabelas para verificar se o conteúdo delas é realmente idêntico.
A classe DemoHelper
contém um método ScanTable
que chama a API de Scan
de baixo nível. O exemplo a seguir mostra como fazer isso.
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); }
Etapa 5: limpar
A demonstração está concluída e, portanto, a aplicação exclui as tabelas de origem e destino. Consulte o seguinte exemplo de código. Mesmo depois que as tabelas são excluídas, seus fluxos permanecem disponíveis por até 24 horas. Após esse período, eles serão automaticamente excluídos.
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable)); dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));