Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Walkthrough: DynamoDB-Streams-Kinesis-Adapter
In diesem Abschnitt wird eine Anleitung für eine Java-Anwendung gegeben, die die Amazon-Kinesis-Client-Library und den Amazon-DynamoDB-Streams-Kinesis-Adapter verwendet. Die Anwendung zeigt ein Beispiel für die Datenreplikation, wobei Schreibaktivitäten einer Tabelle auf eine zweite Tabelle angewendet werden und die Inhalte beider Tabellen synchron bleiben. Sie finden den Quellcode unter Vollständiges Programm: DynamoDB-Streams-Kinesis-Adapter.
Das Programm führt Folgendes aus:
-
Erstellt zwei DynamoDB-Tabellen namens
KCL-Demo-src
undKCL-Demo-dst
. Für jede dieser Tabellen ist ein Stream aktiviert. -
Generiert Aktualisierungsaktivitäten in der Quelltabelle durch Hinzufügen, Aktualisieren und Löschen von Elementen. Dies bewirkt, dass Daten in den Tabellenstream geschrieben werden.
-
Liest die Datensätze aus dem Stream, rekonstruiert diese als DynamoDB-Anforderungen und wendet die Anforderungen auf die Zieltabelle an.
-
Scannt die Quell- und Zieltabellen, um sicherzustellen, dass ihre Inhalte identisch sind.
-
Bereinigt die Daten durch Löschen der Tabellen.
Diese Schritte werden in den folgenden Abschnitten beschrieben und die vollständige Anwendung wird am Ende der Anleitung angezeigt.
Themen
- Schritt 1: Erstellen einer DynamoDB-Tabelle
- Schritt 2: Generieren von Aktualisierungsaktivitäten in der Quelltabelle
- Schritt 3: Verarbeiten des Streams
- Schritt 4: Sicherstellen, dass beide Tabellen über identische Inhalte verfügen
- Schritt 5: Bereinigen
- Vollständiges Programm: DynamoDB-Streams-Kinesis-Adapter
Schritt 1: Erstellen einer DynamoDB-Tabelle
Im ersten Schritt erstellen Sie zwei DynamoDB-Tabellen—eine Quelltabelle und eine Zieltabelle. Der StreamViewType
des Streams der Quelltabelle lautet NEW_IMAGE
. Das bedeutet, dass sobald ein Element in dieser Tabelle geändert wird, das Image des Elements nach der Änderung in den Stream geschrieben wird. So verfolgt der Stream alle Schreibaktivitäten der Tabelle.
Das folgende Beispiel zeigt den Code für das Erstellen von beiden Tabellen.
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition // key ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L) .withWriteCapacityUnits(2L); StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE); CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName) .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema) .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
Schritt 2: Generieren von Aktualisierungsaktivitäten in der Quelltabelle
Im nächsten Schritt erstellen Sie einige Schreibaktivitäten in der Quelltabelle. Während diese Aktivitäten ausgeführt werden, wird der Stream der Quelltabelle nahezu in Echtzeit ebenfalls aktualisiert.
Die Anwendung definiert eine Hilfsklasse mit Methoden, die die DeleteItem
API Operationen PutItem
UpdateItem
, und zum Schreiben der Daten aufrufen. Das folgende Codebeispiel zeigt, wie diese Methoden verwendet werden.
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
Schritt 3: Verarbeiten des Streams
Das Programm beginnt mit der Verarbeitung des Streams. Der DynamoDB Streams Kinesis Adapter fungiert als transparente Ebene zwischen dem DynamoDB Streams-Endpunkt KCL und dem DynamoDB Streams Streams-Endpunkt, sodass der Code vollständig genutzt werden kann, KCL anstatt DynamoDB Streams Streams-Aufrufe auf niedriger Ebene tätigen zu müssen. Das Programm führt die folgenden Aufgaben durch:
-
Er definiert eine Datensatzprozessorklasse,, mit Methoden
StreamsRecordProcessor
, die der Schnittstellendefinition entsprechen:, und. KCLinitialize
processRecords
shutdown
DieprocessRecords
-Methode enthält die Logik, die zum Lesen von der Quelltabelle des Streams und zum Schreiben in die Zieltabelle erforderlich ist. -
Sie definiert eine ClassFactory für die Datensatzprozessor-Klasse (
StreamsRecordProcessorFactory
). Dies ist für Java-Programme erforderlich, die das verwendenKCL. -
Es instanziiert eine neue KCL
Worker
, die der Klassenfabrik zugeordnet ist. -
Sie fährt
Worker
herunter, wenn die Datensatzverarbeitung abgeschlossen ist.
Weitere Informationen zur KCL Schnittstellendefinition finden Sie unter Developing Consumer using the Kinesis Client Library im Amazon Kinesis Data Streams Developer Guide.
Das folgende Codebeispiel zeigt die Hauptschleife in StreamsRecordProcessor
. Die case
-Anweisung bestimmt, welche Aktion basierend auf dem OperationType
, der im Stream-Datensatz erscheint, durchgeführt werden soll.
for (Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record) .getInternalObject(); switch (streamRecord.getEventName()) { case "INSERT": case "MODIFY": StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getNewImage()); break; case "REMOVE": StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getKeys().get("Id").getN()); } } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } }
Schritt 4: Sicherstellen, dass beide Tabellen über identische Inhalte verfügen
An diesem Punkt sind die Inhalte der Quell- und Zieltabellen synchronisiert. Die Anwendung gibt Scan
-Anforderungen für beide Tabellen aus, um sicherzustellen, dass ihre Inhalte identisch sind.
Die DemoHelper
Klasse enthält eine ScanTable
Methode, die das Low-Level aufruft. Scan
API Das Verfahren wird im folgenden Beispiel beschrieben.
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); }
Schritt 5: Bereinigen
Die Demonstration ist abgeschlossen, so dass die Anwendung Quell- und Zieltabellen löscht. Beachten Sie hierzu das folgende Codebeispiel. Nachdem die Tabellen gelöscht wurden, bleiben die Streams für bis zu 24 Stunden verfügbar. Anschließend werden sie automatisch gelöscht.
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable)); dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));