

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Walkthrough: DynamoDB-Streams-Kinesis-Adapter
<a name="Streams.KCLAdapter.Walkthrough"></a>

In diesem Abschnitt wird eine Anleitung für eine Java-Anwendung gegeben, die die Amazon-Kinesis-Client-Library und den Amazon-DynamoDB-Streams-Kinesis-Adapter verwendet. Die Anwendung zeigt ein Beispiel für die Datenreplikation, wobei Schreibaktivitäten einer Tabelle auf eine zweite Tabelle angewendet werden und die Inhalte beider Tabellen synchron bleiben. Sie finden den Quellcode unter [Vollständiges Programm: DynamoDB-Streams-Kinesis-Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md).

Das Programm führt Folgendes aus:

1. Erstellt zwei DynamoDB-Tabellen namens `KCL-Demo-src` und `KCL-Demo-dst`. Für jede dieser Tabellen ist ein Stream aktiviert.

1. Generiert Aktualisierungsaktivitäten in der Quelltabelle durch Hinzufügen, Aktualisieren und Löschen von Elementen. Dies bewirkt, dass Daten in den Tabellenstream geschrieben werden.

1. Liest die Datensätze aus dem Stream, rekonstruiert diese als DynamoDB-Anforderungen und wendet die Anforderungen auf die Zieltabelle an.

1. Scannt die Quell- und Zieltabellen, um sicherzustellen, dass ihre Inhalte identisch sind.

1. Bereinigt die Daten durch Löschen der Tabellen.

Diese Schritte werden in den folgenden Abschnitten beschrieben und die vollständige Anwendung wird am Ende der Anleitung angezeigt.

**Topics**
+ [Schritt 1: Erstellen einer DynamoDB-Tabelle](#Streams.KCLAdapter.Walkthrough.Step1)
+ [Schritt 2: Generieren von Aktualisierungsaktivitäten in der Quelltabelle](#Streams.KCLAdapter.Walkthrough.Step2)
+ [Schritt 3: Verarbeiten des Streams](#Streams.KCLAdapter.Walkthrough.Step3)
+ [Schritt 4: Sicherstellen, dass beide Tabellen über identische Inhalte verfügen](#Streams.KCLAdapter.Walkthrough.Step4)
+ [Schritt 5: Bereinigen](#Streams.KCLAdapter.Walkthrough.Step5)
+ [Vollständiges Programm: DynamoDB-Streams-Kinesis-Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## Schritt 1: Erstellen einer DynamoDB-Tabelle
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

Im ersten Schritt erstellen Sie zwei DynamoDB-Tabellen—eine Quelltabelle und eine Zieltabelle. Der `StreamViewType` des Streams der Quelltabelle lautet `NEW_IMAGE`. Das bedeutet, dass sobald ein Element in dieser Tabelle geändert wird, das Image des Elements nach der Änderung in den Stream geschrieben wird. So verfolgt der Stream alle Schreibaktivitäten der Tabelle.

Das folgende Beispiel zeigt den Code für das Erstellen von beiden Tabellen.

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## Schritt 2: Generieren von Aktualisierungsaktivitäten in der Quelltabelle
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

Im nächsten Schritt erstellen Sie einige Schreibaktivitäten in der Quelltabelle. Während diese Aktivitäten ausgeführt werden, wird der Stream der Quelltabelle nahezu in Echtzeit ebenfalls aktualisiert.

Die Anwendung definiert die Hilfsprogrammklasse mit Methoden zum Aufrufen der API-Operationen `PutItem`, `UpdateItem` und `DeleteItem` zum Schreiben der Daten. Das folgende Codebeispiel zeigt, wie diese Methoden verwendet werden.

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## Schritt 3: Verarbeiten des Streams
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

Das Programm beginnt mit der Verarbeitung des Streams. Der DynamoDB Streams Kinesis Adapter fungiert als transparente Ebene zwischen der KCL und dem DynamoDB Streams-Endpunkt, sodass der Code die KCL vollständig nutzen kann, statt DynamoDB-Streams-Low-Level-Aufrufe tätigen zu müssen. Das Programm führt die folgenden Aufgaben durch:
+ Es definiert eine Datensatzprozessor-Klasse, `StreamsRecordProcessor`, mit Methoden, die mit der KCL-Schnittstellendefinition übereinstimmen: `initialize`, `processRecords` und `shutdown`. Die `processRecords`-Methode enthält die Logik, die zum Lesen von der Quelltabelle des Streams und zum Schreiben in die Zieltabelle erforderlich ist.
+ Sie definiert eine ClassFactory für die Datensatzprozessor-Klasse (`StreamsRecordProcessorFactory`). Dies ist für Java-Programme, die die KCL verwenden, erforderlich.
+ Die Methode instanziiert eine neue KCL `Worker`, die der Class Factory zugeordnet ist.
+ Sie fährt `Worker` herunter, wenn die Datensatzverarbeitung abgeschlossen ist.

Aktivieren Sie optional den Aufholmodus in Ihrer Streams-KCL-Adapterkonfiguration, um die GetRecords API-Aufrufrate automatisch um das Dreifache zu skalieren (Standard), wenn die Verzögerung der Stream-Verarbeitung eine Minute überschreitet (Standard), sodass Ihr Stream-Consumer hohe Durchsatzspitzen in Ihrer Tabelle bewältigen kann.

Weitere Informationen zur KCL-Schnittstellendefinition finden Sie unter [Entwickeln von Konsumenten mithilfe der Kinesis-Clientbibliothek](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) im *Amazon-Kinesis-Data-Streams-Entwicklerhandbuch*. 

Das folgende Codebeispiel zeigt die Hauptschleife in `StreamsRecordProcessor`. Die `case`-Anweisung bestimmt, welche Aktion basierend auf dem `OperationType`, der im Stream-Datensatz erscheint, durchgeführt werden soll.

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## Schritt 4: Sicherstellen, dass beide Tabellen über identische Inhalte verfügen
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

An diesem Punkt sind die Inhalte der Quell- und Zieltabellen synchronisiert. Die Anwendung gibt `Scan`-Anforderungen für beide Tabellen aus, um sicherzustellen, dass ihre Inhalte identisch sind.

Die `DemoHelper`-Klasse enthält eine `ScanTable`-Methode, die die `Scan`-Low-Level-API aufruft. Das Verfahren wird im folgenden Beispiel beschrieben.

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## Schritt 5: Bereinigen
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

Die Demonstration ist abgeschlossen, so dass die Anwendung Quell- und Zieltabellen löscht. Beachten Sie hierzu das folgende Codebeispiel. Nachdem die Tabellen gelöscht wurden, bleiben die Streams für bis zu 24 Stunden verfügbar. Anschließend werden sie automatisch gelöscht.

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```