Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
In diesem Abschnitt wird eine Anleitung für eine Java-Anwendung gegeben, die die Amazon-Kinesis-Client-Library und den Amazon-DynamoDB-Streams-Kinesis-Adapter verwendet. Die Anwendung zeigt ein Beispiel für die Datenreplikation, wobei Schreibaktivitäten einer Tabelle auf eine zweite Tabelle angewendet werden und die Inhalte beider Tabellen synchron bleiben. Sie finden den Quellcode unter Vollständiges Programm: DynamoDB-Streams-Kinesis-Adapter.
Das Programm führt Folgendes aus:
-
Erstellt zwei DynamoDB-Tabellen namens
KCL-Demo-src
undKCL-Demo-dst
. Für jede dieser Tabellen ist ein Stream aktiviert. -
Generiert Aktualisierungsaktivitäten in der Quelltabelle durch Hinzufügen, Aktualisieren und Löschen von Elementen. Dies bewirkt, dass Daten in den Tabellenstream geschrieben werden.
-
Liest die Datensätze aus dem Stream, rekonstruiert diese als DynamoDB-Anforderungen und wendet die Anforderungen auf die Zieltabelle an.
-
Scannt die Quell- und Zieltabellen, um sicherzustellen, dass ihre Inhalte identisch sind.
-
Bereinigt die Daten durch Löschen der Tabellen.
Diese Schritte werden in den folgenden Abschnitten beschrieben und die vollständige Anwendung wird am Ende der Anleitung angezeigt.
Themen
Schritt 1: Erstellen einer DynamoDB-Tabelle
Im ersten Schritt erstellen Sie zwei DynamoDB-Tabellen—eine Quelltabelle und eine Zieltabelle. Der StreamViewType
des Streams der Quelltabelle lautet NEW_IMAGE
. Das bedeutet, dass sobald ein Element in dieser Tabelle geändert wird, das Image des Elements nach der Änderung in den Stream geschrieben wird. So verfolgt der Stream alle Schreibaktivitäten der Tabelle.
Das folgende Beispiel zeigt den Code für das Erstellen von beiden Tabellen.
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));
java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
// key
ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
.withWriteCapacityUnits(2L);
StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
.withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
.withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
Schritt 2: Generieren von Aktualisierungsaktivitäten in der Quelltabelle
Im nächsten Schritt erstellen Sie einige Schreibaktivitäten in der Quelltabelle. Während diese Aktivitäten ausgeführt werden, wird der Stream der Quelltabelle nahezu in Echtzeit ebenfalls aktualisiert.
Die Anwendung definiert die Hilfsprogrammklasse mit Methoden zum Aufrufen der API-Operationen PutItem
, UpdateItem
und DeleteItem
zum Schreiben der Daten. Das folgende Codebeispiel zeigt, wie diese Methoden verwendet werden.
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
Schritt 3: Verarbeiten des Streams
Das Programm beginnt mit der Verarbeitung des Streams. Der DynamoDB Streams Kinesis Adapter fungiert als transparente Ebene zwischen der KCL und dem DynamoDB Streams-Endpunkt, sodass der Code die KCL vollständig nutzen kann, statt DynamoDB-Streams-Low-Level-Aufrufe tätigen zu müssen. Das Programm führt die folgenden Aufgaben durch:
-
Es definiert eine Datensatzprozessor-Klasse,
StreamsRecordProcessor
, mit Methoden, die mit der KCL-Schnittstellendefinition übereinstimmen:initialize
,processRecords
undshutdown
. DieprocessRecords
-Methode enthält die Logik, die zum Lesen von der Quelltabelle des Streams und zum Schreiben in die Zieltabelle erforderlich ist. -
Sie definiert eine ClassFactory für die Datensatzprozessor-Klasse (
StreamsRecordProcessorFactory
). Dies ist für Java-Programme, die die KCL verwenden, erforderlich. -
Die Methode instanziiert eine neue KCL
Worker
, die der Class Factory zugeordnet ist. -
Sie fährt
Worker
herunter, wenn die Datensatzverarbeitung abgeschlossen ist.
Weitere Informationen zur KCL-Schnittstellendefinition finden Sie unter Entwickeln von Konsumenten mithilfe der Kinesis-Clientbibliothek im Amazon-Kinesis-Data-Streams-Entwicklerhandbuch.
Das folgende Codebeispiel zeigt die Hauptschleife in StreamsRecordProcessor
. Die case
-Anweisung bestimmt, welche Aktion basierend auf dem OperationType
, der im Stream-Datensatz erscheint, durchgeführt werden soll.
for (Record record : records) {
String data = new String(record.getData().array(), Charset.forName("UTF-8"));
System.out.println(data);
if (record instanceof RecordAdapter) {
com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record)
.getInternalObject();
switch (streamRecord.getEventName()) {
case "INSERT":
case "MODIFY":
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
streamRecord.getDynamodb().getNewImage());
break;
case "REMOVE":
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
streamRecord.getDynamodb().getKeys().get("Id").getN());
}
}
checkpointCounter += 1;
if (checkpointCounter % 10 == 0) {
try {
checkpointer.checkpoint();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
Schritt 4: Sicherstellen, dass beide Tabellen über identische Inhalte verfügen
An diesem Punkt sind die Inhalte der Quell- und Zieltabellen synchronisiert. Die Anwendung gibt Scan
-Anforderungen für beide Tabellen aus, um sicherzustellen, dass ihre Inhalte identisch sind.
Die DemoHelper
-Klasse enthält eine ScanTable
-Methode, die die Scan
-Low-Level-API aufruft. Das Verfahren wird im folgenden Beispiel beschrieben.
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
.equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
System.out.println("Scan result is equal.");
}
else {
System.out.println("Tables are different!");
}
Schritt 5: Bereinigen
Die Demonstration ist abgeschlossen, so dass die Anwendung Quell- und Zieltabellen löscht. Beachten Sie hierzu das folgende Codebeispiel. Nachdem die Tabellen gelöscht wurden, bleiben die Streams für bis zu 24 Stunden verfügbar. Anschließend werden sie automatisch gelöscht.
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));