Esta seção é uma demonstração de uma aplicação em Java que usa a Amazon Kinesis Client Library e o Amazon DynamoDB Streams Kinesis Adapter. A aplicação mostra um exemplo de replicação de dados, no qual as atividades de gravação de uma tabela são aplicadas a uma segunda tabela, com o conteúdo de ambas mantido em sincronia. Para o código-fonte, consulte Programa completo: DynamoDB Streams Kinesis Adapter.
O programa faz o seguinte:
-
Cria duas tabelas do DynamoDB chamadas
KCL-Demo-src
eKCL-Demo-dst
. Cada uma dessas tabelas tem um fluxo habilitado. -
Gera atividades de atualização na tabela de origem, adicionando, atualizando e excluindo itens. Isso faz com que os dados sejam gravados no fluxo da tabela.
-
Lê os registros do fluxo, faz a reconstrução desses registros como solicitações do DynamoDB e aplica essas solicitações à tabela de destino.
-
Verifica as tabelas de origem e destino para garantir que o conteúdo seja idêntico.
-
Realiza uma limpeza excluindo as tabelas.
Essas etapas estão descritas nas seções a seguir, e a aplicação completa é mostrada no final do passo-a-passo.
Tópicos
Etapa 1: criar tabelas do DynamoDB
A primeira etapa é criar duas tabelas do DynamoDB: uma de origem e outra de destino. O StreamViewType
no fluxo da tabela de origem é NEW_IMAGE
. Isso significa que sempre que um item é modificado nessa tabela, o item “depois” da imagem é gravado no fluxo. Dessa forma, o fluxo mantém o controle de todas as atividades de gravação na tabela.
O exemplo a seguir mostra o código usado para a criação das duas tabelas.
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));
java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
// key
ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
.withWriteCapacityUnits(2L);
StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
.withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
.withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
Etapa 2: gerar atividades de atualização na tabela de origem
O próximo passo é gerar algumas atividades de gravação na tabela de origem. Enquanto essas atividades estão ocorrendo, o fluxo da tabela de origem também é atualizado quase em tempo real.
A aplicação define uma classe auxiliar com métodos que chamam as operações da API PutItem
, UpdateItem
e DeleteItem
para gravar os dados. O exemplo de código a seguir mostra como esses métodos são usados.
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
Etapa 3: processar o fluxo
Agora, o programa inicia o processamento do fluxo. O DynamoDB Streams Kinesis Adapter atua como uma camada transparente entre a KCL e o endpoint do DynamoDB Streams para que o código possa usar totalmente a KCL em vez de precisar fazer chamadas de baixo nível ao DynamoDB Streams. O programa realiza as seguintes tarefas:
-
Ele define uma classe de processador de registro,
StreamsRecordProcessor
, com métodos que estão em conformidade com a definição de interface da KCL:initialize
,processRecords
eshutdown
. O métodoprocessRecords
contém a lógica necessária para leituras do fluxo da tabela de origem e para gravações na tabela de destino. -
Ele define uma fábrica de classes para a classe de processador de registro (
StreamsRecordProcessorFactory
). Isso é necessário para programas Java que usam a KCL. -
Ele instancia um novo
Worker
da KCL, que está associado à fábrica de classes. -
Ele desliga
Worker
quando o processamento do registro é concluído.
Para saber mais sobre a definição da interface da KCL, consulte Desenvolvimento de consumidores usando a Amazon Kinesis Client Library no Guia do desenvolvedor do Amazon Kinesis Data Streams.
O exemplo de código a seguir mostra o loop principal em StreamsRecordProcessor
. A instrução case
determina a ação a ser executada, com base no OperationType
que aparece no registro de fluxo.
for (Record record : records) {
String data = new String(record.getData().array(), Charset.forName("UTF-8"));
System.out.println(data);
if (record instanceof RecordAdapter) {
com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record)
.getInternalObject();
switch (streamRecord.getEventName()) {
case "INSERT":
case "MODIFY":
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
streamRecord.getDynamodb().getNewImage());
break;
case "REMOVE":
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
streamRecord.getDynamodb().getKeys().get("Id").getN());
}
}
checkpointCounter += 1;
if (checkpointCounter % 10 == 0) {
try {
checkpointer.checkpoint();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
Etapa 4: garantir que as duas tabelas tenham conteúdo idêntico
Neste ponto, o conteúdo das tabelas de origem e destino está sincronizado. A aplicação emite solicitações Scan
em ambas as tabelas para verificar se o conteúdo delas é realmente idêntico.
A classe DemoHelper
contém um método ScanTable
que chama a API de Scan
de baixo nível. O exemplo a seguir mostra como fazer isso.
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
.equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
System.out.println("Scan result is equal.");
}
else {
System.out.println("Tables are different!");
}
Etapa 5: limpar
A demonstração está concluída e, portanto, a aplicação exclui as tabelas de origem e destino. Consulte o seguinte exemplo de código. Mesmo depois que as tabelas são excluídas, seus fluxos permanecem disponíveis por até 24 horas. Após esse período, eles serão automaticamente excluídos.
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));