Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Spiegazione passo per passo: Adattatore Kinesis DynamoDB Streams
In questa sezione viene riportata una spiegazione passo per passo di un'applicazione Java che utilizza Amazon Kinesis Client Library e l'adattatore Amazon DynamoDB Streams Kinesis. L'applicazione mostra un esempio di replica dei dati, in cui l'attività di scrittura da una tabella viene applicata a una seconda tabella e i contenuti di entrambe le tabelle rimangono sincronizzati. Per il codice sorgente, consulta Programma completo: Adattatore Kinesis di DynamoDB Streams.
Il programma effettua le seguenti operazioni:
-
Crea due tabelle DynamoDB denominate
KCL-Demo-src
eKCL-Demo-dst
. Su ognuna di queste tabelle è abilitato un flusso. -
Genera l'attività di aggiornamento nella tabella di origine aggiungendo, aggiornando ed eliminando gli elementi. Questo fa sì che i dati vengano scritti nel flusso della tabella.
-
Legge i record dal flusso, li ricostruisce come richieste DynamoDB e applica le richieste alla tabella di destinazione.
-
Esegue la scansione delle tabelle di origine e di destinazione per garantire che i contenuti siano identici.
-
Esegue la pulizia eliminando le tabelle.
Queste fasi sono descritte nelle sezioni seguenti e l'applicazione completa viene mostrata alla fine della procedura guidata.
Argomenti
Fase 1: creazione di tabelle DynamoDB
Il primo passo consiste nel creare due tabelle DynamoDB, una di origine e una di destinazione. StreamViewType
sul flusso della tabella di origine è NEW_IMAGE
. Questo significa che ogni volta che un item viene modificato in questa tabella, l'immagine "successiva" dell'item viene scritta nel flusso. In questo modo, il flusso tiene traccia di tutte le attività di scrittura della tabella.
Il seguente esempio mostra il codice utilizzato per creare entrambe le tabelle.
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition // key ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L) .withWriteCapacityUnits(2L); StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE); CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName) .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema) .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
Fase 2: generazione dell'attività di aggiornamento nella tabella di origine
La fase successiva consiste nel generare le attività di scrittura sulla tabella di origine. Mentre questa attività è in corso, il flusso della tabella di origine viene aggiornato pressoché in tempo reale.
L'applicazione definisce una classe di supporto con metodi che chiamano PutItem
UpdateItem
, e DeleteItem
API operazioni per la scrittura dei dati. Il seguente esempio di codice mostra come vengono utilizzati questi metodi.
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
Fase 3: elaborazione del flusso
Ora il programma inizia l'elaborazione del flusso. Il DynamoDB Streams Kinesis Adapter funge da livello trasparente tra l'endpoint DynamoDB Streams KCL e l'endpoint DynamoDB Streams, in modo che il codice possa essere utilizzato appieno anziché dover effettuare chiamate DynamoDB Streams di basso livello. KCL Il programma esegue le attività di seguito elencate:
-
Definisce una classe di processori di record, con metodi conformi alla definizione dell'interfaccia:,
StreamsRecordProcessor
, e. KCLinitialize
processRecords
shutdown
Il metodoprocessRecords
contiene la logica necessaria per la lettura dal flusso della tabella di origine e la scrittura nella tabella di destinazione. -
Definisce una factory di classe per la classe di elaboratore di record (
StreamsRecordProcessorFactory
). Ciò è necessario per i programmi Java che utilizzanoKCL. -
Crea un'istanza nuova KCL
Worker
, associata alla class factory. -
Arresta il
Worker
quando l'elaborazione del record è completata.
Per ulteriori informazioni sulla definizione dell'KCLinterfaccia, consulta Developing consumer using the Kinesis Client Library nella Amazon Kinesis Data Streams Developer Guide.
Il seguente esempio di codice mostra il loop principale in StreamsRecordProcessor
. L'istruzione case
determina quale operazione eseguire, sulla base dell'item OperationType
presente nel record del flusso.
for (Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record) .getInternalObject(); switch (streamRecord.getEventName()) { case "INSERT": case "MODIFY": StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getNewImage()); break; case "REMOVE": StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getKeys().get("Id").getN()); } } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } }
Fase 4: verifica che entrambe le tabelle abbiano contenuti identici
A questo punto, i contenuti delle tabelle di origine e destinazione sono sincronizzati. L'applicazione emette le richieste Scan
su entrambe le tabelle per verificare che i loro contenuti siano effettivamente identici.
La DemoHelper
classe contiene un ScanTable
metodo che chiama il livello basso. Scan
API L'esempio seguente mostra come viene utilizzato.
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); }
Fase 5: rimozione
La demo è completata, quindi l'applicazione elimina le tabelle di origine e di destinazione. Vedere l'esempio di codice seguente. Anche dopo l'eliminazione delle tabelle, i flussi rimangono disponibili per altre 24 ore, dopo di che vengono automaticamente eliminati.
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable)); dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));