

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Verwenden des DynamoDB-Streams-Kinesis-Adapters zum Verarbeiten von Stream-Datensätzen
<a name="Streams.KCLAdapter"></a>

Die Verwendung des Amazon-Kinesis-Adapters ist die empfohlene Methode zum Verwenden von Streams aus Amazon DynamoDB. Die DynamoDB-Streams-API ist absichtlich ähnlich gestaltet wie die von Kinesis Data Streams. In beiden Services bestehen die Daten-Streams aus Shards. Dieses sind Container für Stream-Datensätze. Beide Dienste APIs enthalten`ListStreams`, `DescribeStream``GetShards`, und `GetShardIterator` Operationen. (Diese DynamoDB-Streams-Aktionen ähneln ihren Gegenstücken in Kinesis Data Streams, sind jedoch nicht vollkommen identisch.)

Als DynamoDB-Streams-Benutzer können Sie das Entwurfsmuster in der KCL zum Verarbeiten von DynamoDB-Streams-Shards und Stream-Datensätzen verwenden. Verwenden Sie dazu den DynamoDB-Streams-Kinesis-Adapter. Der Kinesis-Adapter implementiert die Kinesis-Data-Streams-Schnittstelle so, dass die KCL zum Verwenden und Verarbeiten von Datensätzen aus DynamoDB Streams eingesetzt werden kann. [Anweisungen zur Einrichtung und Installation des DynamoDB Streams Kinesis Adapters finden Sie im Repository. GitHub](https://github.com/awslabs/dynamodb-streams-kinesis-adapter)

Sie können Anwendungen für Kinesis Data Streams mit der Kinesis Client Library (KCL) schreiben. Die KCL vereinfacht die Codierung durch Bereitstellen nützlicher Abstraktionen oberhalb der Low-Level-Kinesis-Data-Streams-API. Weitere Informationen zur KCL finden Sie im [Entwickeln von Konsumenten mit der Kinesis-Client-Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) im *Entwicklerhandbuch zu Amazon Kinesis Data Streams*.

DynamoDB empfiehlt die Verwendung von KCL Version 3.x mit AWS SDK for Java v2.x. [Die aktuelle Version 1.x des DynamoDB Streams Kinesis Adapters mit AWS SDK für AWS SDK für Java v1.x wird während des gesamten Lebenszyklus weiterhin vollständig unterstützt, wie dies während der Übergangsphase gemäß den Wartungsrichtlinien für Tools vorgesehen war.AWS SDKs ](https://docs.aws.amazon.com/sdkref/latest/guide/maint-policy.html)

**Anmerkung**  
Die Versionen 1.x und 2.x der Amazon Kinesis Client Library (KCL) sind veraltet. KCL 1.x wird am 30. Januar 2026 verfügbar sein. end-of-support Wir empfehlen dringend, KCL-Anwendungen, die Version 1.x verwenden, vor dem 30. Januar 2026 zur neuesten KCL-Version zu migrieren. Die neueste KCL-Version finden Sie auf der Seite [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) unter. GitHub Weitere Informationen zur aktuellen Versionen finden Sie unter [Use Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/kcl.html). Weitere Informationen zur Migration von KCL 1.x zu KCL 3.x finden Sie unter „Migrating from KCL 1.x to KCL 3.x“.

Das folgende Diagramm zeigt, wie diese Bibliotheken miteinander interagieren.

![\[Interaktion zwischen DynamoDB Streams, Kinesis Data Streams und KCL zur Verarbeitung von DynamoDB-Streams-Datensätzen.\]](http://docs.aws.amazon.com/de_de/amazondynamodb/latest/developerguide/images/streams-kinesis-adapter.png)


Wenn Sie den DynamoDB-Streams-Kinesis-Adapter eingerichtet haben, können Sie die Entwicklung mit der KCL-Schnittstelle starten, wobei die API-Aufrufe nahtlos an den DynamoDB-Streams-Endpunkt gerichtet werden.

Beim Start der Anwendung wird die KCL aufgerufen, einen Worker zu instanziieren. Sie müssen dem Worker Konfigurationsinformationen für die Anwendung, wie z. B. den Stream-Deskriptor und die AWS Anmeldeinformationen, sowie den Namen einer von Ihnen angegebenen Datensatzprozessorklasse zur Verfügung stellen. Da der Code im Datensatzprozessor ausgeführt wird, erledigt der Worker die folgenden Aufgaben:
+ Stellt eine Verbindung mit dem Stream her
+ Listet die Shards innerhalb des Streams auf
+ Überprüft und listet untergeordnete Shards eines geschlossenen übergeordneten Shards innerhalb des Streams auf
+ Koordiniert Shard-Zuordnungen mit anderen Auftragnehmern (wenn vorhanden)
+ Instanziiert einen Datensatzverarbeiter für jeden Shard, der verwaltet wird
+ Ruft Datensätze aus dem Stream per Pull ab
+ Skaliert die GetRecords API-Aufrufrate bei hohem Durchsatz (wenn der Nachholmodus konfiguriert ist)
+ Überträgt per Push Datensätze an den entsprechenden Datensatzverarbeiter
+ Verwendet Checkpoints für verarbeitete Datensätze
+ Gleicht Shard-Auftragnehmer-Zuordnungen aus, wenn die Auftragnehmer-Instance Änderungen zählt
+ Gleicht Shard-Worker-Zuordnungen aus, wenn Shards aufgeteilt werden

Der KCL-Adapter unterstützt den Catch-up-Modus, eine Funktion zur automatischen Anpassung der Anrufrate zur Bewältigung vorübergehender Durchsatzerhöhungen. Wenn die Verzögerung bei der Stream-Verarbeitung einen konfigurierbaren Schwellenwert (standardmäßig eine Minute) überschreitet, skaliert der Aufholmodus die GetRecords API-Aufruffrequenz um einen konfigurierbaren Wert (Standard 3x), um Datensätze schneller abzurufen, und kehrt dann zum Normalzustand zurück, sobald die Verzögerung nachlässt. Dies ist in Zeiten mit hohem Durchsatz nützlich, in denen DynamoDB-Schreibaktivitäten die Verbraucher bei Verwendung der Standardabfrageraten überfordern können. Der Nachholmodus kann über den Konfigurationsparameter aktiviert werden (Standardeinstellung falsch). `catchupEnabled`

**Anmerkung**  
Eine Beschreibung der hier aufgeführten KCL-Konzepte finden Sie unter [Entwickeln von Konsumenten mithilfe der Kinesis-Clientbibliothek](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) im *Amazon-Kinesis-Data-Streams-Entwicklerhandbuch*.  
Weitere Informationen zur Verwendung von Streams mit AWS Lambda [DynamoDB Streams und -Trigger AWS Lambda](Streams.Lambda.md)

# Migrieren von KCL 1.x zu KCL 3.x
<a name="streams-migrating-kcl"></a>

## -Übersicht
<a name="migrating-kcl-overview"></a>

In dieser Anleitung wird erläutert, wie Sie Ihre Verbraucheranwendung von KCL 1.x zu KCL 3.x migrieren. Aufgrund der unterschiedlichen Architektur von KCL 1.x und KCL 3.x müssen für die Migration mehrere Komponenten aktualisiert werden, um die Kompatibilität sicherzustellen.

KCL 1.x verwendet im Vergleich zu KCL 3.x andere Klassen und Schnittstellen. Sie müssen zuerst die Klassen „Datensatzprozessor“, „Datensatzprozessor-Factory“ und „Worker“ in das KCL 3.x-kompatible Format migrieren und dann die Schritte für die Migration von KCL 1.x zu KCL 3.x ausführen.

## Schritte zur Migration
<a name="migration-steps"></a>

**Topics**
+ [Schritt 1: Migrieren des Datensatzprozessors](#step1-record-processor)
+ [Schritt 2: Migrieren der Datensatzprozessor-Factory](#step2-record-processor-factory)
+ [Schritt 3: Migrieren des Workers](#step3-worker-migration)
+ [Schritt 4: Überblick über die Konfiguration und Empfehlungen für KCL 3.x](#step4-configuration-migration)
+ [Schritt 5: Migrieren von KCL 2.x zu KCL 3.x](#step5-kcl2-to-kcl3)

### Schritt 1: Migrieren des Datensatzprozessors
<a name="step1-record-processor"></a>

Das folgende Beispiel zeigt einen Datensatzprozessor, der für die Version KCL 1.x des DynamoDB-Streams-Kinesis-Adapters implementiert wurde:

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**Um die RecordProcessor Klasse zu migrieren**

1. Ändern Sie die Schnittstellen von `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` und `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` folgendermaßen zu `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor`:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. Aktualisieren Sie die Importanweisungen für die Methoden `initialize` und `processRecords`:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. Ersetzen Sie die Methode `shutdownRequested` durch die folgenden neuen Methoden: `leaseLost`, `shardEnded` und `shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

Nachstehend finden Sie die aktualisierte Version der Datensatzprozessorklasse:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**Anmerkung**  
DynamoDB Streams Kinesis Adapter verwendet SDKv2 jetzt das Record-Modell. In SDKv2 geben komplexe `AttributeValue` Objekte (`BS`,,, `NS` `M``L`,`SS`) niemals Null zurück. Verwenden Sie die Methoden `hasBs()`, `hasNs()`, `hasM()`, `hasL()`, `hasSs()`, um zu überprüfen, ob diese Werte existieren.

### Schritt 2: Migrieren der Datensatzprozessor-Factory
<a name="step2-record-processor-factory"></a>

Die Datensatzprozessor-Factory ist für das Erstellen von Prozessoren verantwortlich, wenn eine Lease erworben wird. Nachfolgend sehen Sie ein Beispiel für eine KCL-1.x-Factory:

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**So migrieren Sie die `RecordProcessorFactory`**
+ Ändern Sie die implementierte Schnittstelle von `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` folgendermaßen zu `software.amazon.kinesis.processor.ShardRecordProcessorFactory`:

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

Nachfolgend sehen Sie ein Beispiel für die Datensatzprozessor-Factory in 3.0:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### Schritt 3: Migrieren des Workers
<a name="step3-worker-migration"></a>

In Version 3.0 der KCL wird die **Worker**-Klasse durch eine neue Klasse namens **Scheduler** ersetzt. Nachfolgend sehen Sie ein Beispiel für einen KCL-1.x-Worker.

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**So migrieren Sie den Worker**

1. Ändern Sie die `import`-Anweisung für die `Worker`-Klasse in die Import-Anweisungen für die Klassen `Scheduler` und `ConfigsBuilder`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Importieren Sie `StreamTracker` und ändern Sie den Import von `StreamsWorkerFactory` zu `StreamsSchedulerFactory`.

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. Wählen Sie die Position, von der aus die Anwendung gestartet werden soll. Möglich sind `TRIM_HORIZON` oder `LATEST`.

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. Erstellen Sie eine `StreamTracker`-Instance.

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. Erstellen Sie das Objekt `AmazonDynamoDBStreamsAdapterClient`.

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. Erstellen Sie das Objekt `ConfigsBuilder`.

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. Erstellen Sie den `Scheduler` mithilfe von `ConfigsBuilder`, wie im folgenden Beispiel gezeigt:

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**Wichtig**  
Die Einstellung `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` gewährleistet die Kompatibilität zwischen dem DynamoDB-Streams-Kinesis-Adapter für KCL v3 und KCL v1, nicht zwischen KCL v2 und v3.

### Schritt 4: Überblick über die Konfiguration und Empfehlungen für KCL 3.x
<a name="step4-configuration-migration"></a>

Eine ausführliche Beschreibung der nach KCL 1.x eingeführten Konfigurationen, die für KCL 3.x relevant sind, finden Sie unter [KCL Configurations](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html) und [KCL Migration Client Configuration](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration).

**Wichtig**  
Anstatt Objekte von `checkpointConfig`, `coordinatorConfig`, `leaseManagementConfig`, `metricsConfig`, `processorConfig` und `retrievalConfig` direkt zu erstellen, empfehlen wir, `ConfigsBuilder` zu verwenden, um Konfigurationen in KCL 3.x und aktuelleren Versionen einzustellen. Das verhindert Probleme mit der Scheduler-Initialisierung. `ConfigsBuilder` bietet eine flexiblere und wartungsfreundlichere Methode zur Konfiguration Ihrer KCL-Anwendung.

#### Konfigurationen mit dem Standardwert für Aktualisierungen in KCL 3.x
<a name="kcl3-configuration-overview"></a>

`billingMode`  
In der KCL-Version 1.x ist der Standardwert für `billingMode` auf `PROVISIONED` eingestellt. Bei der KCL-Version 3.x ist die Standardeinstellung für `billingMode` jedoch `PAY_PER_REQUEST` (On-Demand-Modus). Wir empfehlen Ihnen, den On-Demand-Kapazitätsmodus für Ihre Leasetabelle zu verwenden, um die Kapazität automatisch an die Nutzung anzupassen. Anleitungen zur Verwendung der bereitgestellten Kapazität für Ihre Leasetabellen finden Sie unter [Best practices for the lease table with provisioned capacity mode](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html).

`idleTimeBetweenReadsInMillis`  
In der KCL-Version 1.x ist der Standardwert für `idleTimeBetweenReadsInMillis` auf 1 000 (oder 1 Sekunde) eingestellt. Die KCL-Version 3.x legt den Standardwert für `dleTimeBetweenReadsInMillis` auf 1 500 (oder 1,5 Sekunden) fest. Der Amazon-DynamoDB-Streams-Kinesis-Adapter überschreibt den Standardwert jedoch mit 1 000 (oder 1 Sekunde).

#### Neue Konfigurationen in KCL 3.x
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
Diese Konfiguration definiert das Zeitintervall, bis die Verarbeitung neu erkannter Shards beginnt. Es wird nach der Formel 1,5 × `leaseAssignmentIntervalMillis` berechnet. Wenn diese Einstellung nicht explizit konfiguriert ist, beträgt das Zeitintervall standardmäßig 1,5 × `failoverTimeMillis`. Die Verarbeitung neuer Shards beinhaltet das Scannen der Leasetabelle und das Abfragen eines globalen sekundären Index (GSI) in der Leasetabelle. Eine Absenkung des `leaseAssignmentIntervalMillis`-Werts erhöht die Häufigkeit dieser Scan- und Abfragevorgänge, was zu höheren DynamoDB-Kosten führt. Wir empfehlen, diesen Wert auf 2 000 (oder 2 Sekunden) einzustellen, um die Verzögerung bei der Verarbeitung neuer Shards zu minimieren.

`shardConsumerDispatchPollIntervalMillis`  
Diese Konfiguration definiert das Intervall zwischen aufeinanderfolgenden Abfragen durch den Shard-Verbraucher, um Zustandsübergänge auszulösen. In KCL-Version 1.x wurde dieses Verhalten durch den Parameter `idleTimeInMillis` gesteuert, der nicht als konfigurierbare Einstellung verfügbar war. Bei KCL-Version 3.x empfehlen wir, diese Konfiguration auf den Wert einzustellen, der in Ihrer KCL-Version 1.x für ` idleTimeInMillis` verwendet wurde.

### Schritt 5: Migrieren von KCL 2.x zu KCL 3.x
<a name="step5-kcl2-to-kcl3"></a>

Um für einen reibungslosen Übergang und Kompatibilität mit der neuesten Version der Kinesis Client Library (KCL) zu sorgen, folgen Sie den Schritten 5–8 der Anleitung für das [Upgrade von KCL 2.x auf KCL 3.x](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics) im Migrationsleitfaden.

Informationen zur Fehlerbehebung in KCL 3.x finden Sie unter [Troubleshooting KCL consumer applications](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html).

# Rollback zu einer früheren KCL-Version
<a name="kcl-migration-rollback"></a>

In diesem Abschnitt wird erläutert, wie Sie Ihre Verbraucheranwendung auf die vorherige KCL-Version zurücksetzen können. Der Rollback-Prozess besteht aus zwei Schritten:

1. Führen Sie das [KCL Migration Tool](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) aus.

1. Stellen Sie den Code der vorherigen KCL-Version erneut bereit.

## Schritt 1: Ausführen des KCL Migration Tools
<a name="kcl-migration-rollback-step1"></a>

Wenn Sie zur vorherigen KCL-Version zurückkehren möchten, müssen Sie das KCL Migration Tool ausführen. Das Tool erfüllt zwei wichtige Aufgaben:
+ Es entfernt die Metadatentabelle für Worker-Metriken und den globalen sekundären Index aus der Leasetabelle in DynamoDB. Diese Artefakte werden von KCL 3.x erstellt, werden aber nicht benötigt, wenn Sie zur vorherigen Version zurückkehren.
+ Dadurch werden alle Worker in einem mit KCL 1.x kompatiblen Modus ausgeführt und verwenden wieder den Load-Balancing-Algorithmus aus früheren KCL-Versionen. Eventuelle Probleme mit dem neuen Load-Balancing-Algorithmus in KCL 3.x werden dadurch sofort behoben.

**Wichtig**  
Die Koordinatorstatustabelle in DynamoDB muss vorhanden sein und darf während des Migrations-, Rollback- und Rollforward-Prozesses nicht gelöscht werden.

**Anmerkung**  
Es ist wichtig, dass alle Worker in Ihrer Verbraucheranwendung zu einem bestimmten Zeitpunkt denselben Load-Balancing-Algorithmus verwenden. Das KCL Migration Tool sorgt dafür, dass alle Worker in Ihrer KCL-3.x-Verbraucheranwendung in den KCL-1.x-kompatiblen Modus wechseln. So führen alle Worker während des Rollbacks zur vorherigen KCL-Version denselben Load-Balancing-Algorithmus aus.

Sie können das [KCL-Migrationstool](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) im Skriptverzeichnis des [ GitHubKCL-Repositorys](https://github.com/awslabs/amazon-kinesis-client/tree/master) herunterladen. Führen Sie das Skript über einen Worker oder Host aus, der berechtigt ist, in die Koordinatorstatustabelle, die Tabelle mit den Worker-Metriken und die Leasetabelle zu schreiben. Stellen Sie sicher, dass die entsprechenden [IAM-Berechtigungen](https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html) für KCL-Verbraucheranwendungen konfiguriert sind. Führen Sie das Skript mit dem folgenden Befehl nur einmal pro KCL-Anwendung aus:

```
python3 ./KclMigrationTool.py --region region --mode rollback [--application_name applicationName] [--lease_table_name leaseTableName] [--coordinator_state_table_name coordinatorStateTableName] [--worker_metrics_table_name workerMetricsTableName]
```

### Parameters
<a name="kcl-migration-rollback-parameters"></a>

`--region`  
Ersetze es durch *region* dein. AWS-Region

`--application_name`  
Dieser Parameter ist erforderlich, wenn Sie Standardnamen für Ihre DynamoDB-Metadatentabellen (Leasetabelle, Koordinatorstatustabelle und Worker-Metriktabelle) verwenden. Wenn Sie benutzerdefinierte Namen für diese Tabellen festgelegt haben, können Sie diesen Parameter weglassen. *applicationName*Ersetzen Sie es durch Ihren tatsächlichen KCL-Anwendungsnamen. Das Tool verwendet diesen Namen, um die Standardtabellennamen abzuleiten, wenn keine benutzerdefinierten Namen angegeben werden.

`--lease_table_name`  
Dieser Parameter wird benötigt, wenn Sie in Ihrer KCL-Konfiguration einen benutzerdefinierten Namen für die Leasetabelle festgelegt haben. Wenn Sie den Standardtabellennamen verwenden, können Sie diesen Parameter weglassen. *leaseTableName*Ersetzen Sie durch den benutzerdefinierten Tabellennamen, den Sie für Ihre Leasingtabelle angegeben haben.

`--coordinator_state_table_name`  
Dieser Parameter wird benötigt, wenn Sie in Ihrer KCL-Konfiguration einen benutzerdefinierten Namen für die Koordinatorstatustabelle festgelegt haben. Wenn Sie den Standardtabellennamen verwenden, können Sie diesen Parameter weglassen. *coordinatorStateTableName*Ersetzen Sie es durch den benutzerdefinierten Tabellennamen, den Sie für Ihre Koordinatorstatentabelle angegeben haben.

`--worker_metrics_table_name`  
Dieser Parameter wird benötigt, wenn Sie in Ihrer KCL-Konfiguration einen benutzerdefinierten Namen für die Worker-Metriktabelle festgelegt haben. Wenn Sie den Standardtabellennamen verwenden, können Sie diesen Parameter weglassen. *workerMetricsTableName*Ersetzen Sie es durch den Namen der benutzerdefinierten Tabelle, den Sie für Ihre Tabelle mit Arbeitskennzahlen angegeben haben.

## Schritt 2: Erneutes Bereitstellen des Codes mit der vorherigen KCL-Version
<a name="kcl-migration-rollback-step2"></a>

**Wichtig**  
Jede Erwähnung von Version 2.x in der vom KCL Migration Tool generierten Ausgabe sollte als Bezugnahme auf KCL-Version 1.x interpretiert werden. Durch die Ausführung des Skripts erfolgt kein vollständiges Rollback. Es wird lediglich der Load-Balancing-Algorithmus auf den in KCL Version 1.x verwendeten umgestellt.

Nachdem das KCL Migration Tool den Rollback ausgeführt hat, wird eine der folgenden Meldungen angezeigt:

Nachricht 1  
„Rollback completed. Your application was running 2x compatible functionality. Please rollback to your previous application binaries by deploying the code with your previous KCL version.“  
**Erforderliche Maßnahme:** Dies bedeutet, dass Ihre Worker im KCL-1.x-kompatiblen Modus ausgeführt wurden. Stellen Sie den Code für Ihre Worker erneut mit der vorherigen KCL-Version bereit.

Nachricht 2  
„Rollback completed. Your KCL Application was running 3x functionality and will rollback to 2x compatible functionality. If you don't see mitigation after a short period of time, please rollback to your previous application binaries by deploying the code with your previous KCL version.“  
**Erforderliche Maßnahme:** Dies bedeutet, dass Ihre Worker im KCL-3.x-kompatiblen Modus ausgeführt wurden und das KCL Migration Tool alle Worker auf den KCL-1.x-kompatiblen Modus zurückgesetzt hat. Stellen Sie den Code für Ihre Worker erneut mit der vorherigen KCL-Version bereit.

Nachricht 3  
„Application was already rolled back. Alle KCLv3 Ressourcen, die gelöscht werden könnten, wurden gelöscht, um Gebühren zu vermeiden, bis die Anwendung im Rahmen der Migration weiterentwickelt werden kann.“  
**Erforderliche Maßnahme:** Dies bedeutet, dass Ihre Worker bereits in den KCL-1.x-kompatiblen Modus zurückversetzt wurden. Stellen Sie den Code für Ihre Worker erneut mit der vorherigen KCL-Version bereit.

# Rollforward zu KCL 3.x nach einem Rollback
<a name="kcl-migration-rollforward"></a>

In diesem Abschnitt wird erläutert, wie Sie Ihre Verbraucheranwendung nach einem Rollback wieder auf KCL 3.x umstellen können. Wenn Sie einen Rollforward durchführen müssen, sind zwei Schritte erforderlich:

1. Führen Sie das [KCL Migration Tool](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) aus.

1. Stellen Sie den Code mit KCL 3.x bereit.

## Schritt 1: Ausführen des KCL Migration Tools
<a name="kcl-migration-rollforward-step1"></a>

Führen Sie das KCL Migration Tool mit dem folgenden Befehl aus, um KCL 3.x wiederherzustellen:

```
python3 ./KclMigrationTool.py --region region --mode rollforward [--application_name applicationName] [--coordinator_state_table_name coordinatorStateTableName]
```

### Parameters
<a name="kcl-migration-rollforward-parameters"></a>

`--region`  
Ersetze es *region* durch dein AWS-Region.

`--application_name`  
Dieser Parameter ist erforderlich, wenn Sie Standardnamen für die Koordinatorstatustabelle verwenden. Wenn Sie benutzerdefinierte Namen für diese Tabelle festgelegt haben, können Sie diesen Parameter weglassen. *applicationName*Ersetzen Sie es durch Ihren tatsächlichen KCL-Anwendungsnamen. Das Tool verwendet diesen Namen, um die Standardtabellennamen abzuleiten, wenn keine benutzerdefinierten Namen angegeben werden.

`--coordinator_state_table_name`  
Dieser Parameter wird benötigt, wenn Sie in Ihrer KCL-Konfiguration einen benutzerdefinierten Namen für die Koordinatorstatustabelle festgelegt haben. Wenn Sie den Standardtabellennamen verwenden, können Sie diesen Parameter weglassen. *coordinatorStateTableName*Ersetzen Sie durch den benutzerdefinierten Tabellennamen, den Sie für Ihre Koordinatorstatustabelle angegeben haben.

Nachdem Sie das Migrationstool im Roll-Forward-Modus ausgeführt haben, erstellt KCL die folgenden DynamoDB-Ressourcen, die für KCL 3.x erforderlich sind:
+ einen globalen sekundären Index für die Leasetabelle
+ eine Tabelle mit Worker-Metriken

## Schritt 2: Bereitstellen des Codes mit KCL 3.x
<a name="kcl-migration-rollforward-step2"></a>

Nachdem Sie den Rollforward mit dem KCL Migration Tool ausgeführt haben, stellen Sie Ihren Code mit KCL 3.x für Ihre Worker bereit. Informationen zum Abschließen Ihrer Migration finden Sie unter [Schritt 8: Abschließen der Migration](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-finish).

# Walkthrough: DynamoDB-Streams-Kinesis-Adapter
<a name="Streams.KCLAdapter.Walkthrough"></a>

In diesem Abschnitt wird eine Anleitung für eine Java-Anwendung gegeben, die die Amazon-Kinesis-Client-Library und den Amazon-DynamoDB-Streams-Kinesis-Adapter verwendet. Die Anwendung zeigt ein Beispiel für die Datenreplikation, wobei Schreibaktivitäten einer Tabelle auf eine zweite Tabelle angewendet werden und die Inhalte beider Tabellen synchron bleiben. Sie finden den Quellcode unter [Vollständiges Programm: DynamoDB-Streams-Kinesis-Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md).

Das Programm führt Folgendes aus:

1. Erstellt zwei DynamoDB-Tabellen namens `KCL-Demo-src` und `KCL-Demo-dst`. Für jede dieser Tabellen ist ein Stream aktiviert.

1. Generiert Aktualisierungsaktivitäten in der Quelltabelle durch Hinzufügen, Aktualisieren und Löschen von Elementen. Dies bewirkt, dass Daten in den Tabellenstream geschrieben werden.

1. Liest die Datensätze aus dem Stream, rekonstruiert diese als DynamoDB-Anforderungen und wendet die Anforderungen auf die Zieltabelle an.

1. Scannt die Quell- und Zieltabellen, um sicherzustellen, dass ihre Inhalte identisch sind.

1. Bereinigt die Daten durch Löschen der Tabellen.

Diese Schritte werden in den folgenden Abschnitten beschrieben und die vollständige Anwendung wird am Ende der Anleitung angezeigt.

**Topics**
+ [Schritt 1: Erstellen einer DynamoDB-Tabelle](#Streams.KCLAdapter.Walkthrough.Step1)
+ [Schritt 2: Generieren von Aktualisierungsaktivitäten in der Quelltabelle](#Streams.KCLAdapter.Walkthrough.Step2)
+ [Schritt 3: Verarbeiten des Streams](#Streams.KCLAdapter.Walkthrough.Step3)
+ [Schritt 4: Sicherstellen, dass beide Tabellen über identische Inhalte verfügen](#Streams.KCLAdapter.Walkthrough.Step4)
+ [Schritt 5: Bereinigen](#Streams.KCLAdapter.Walkthrough.Step5)
+ [Vollständiges Programm: DynamoDB-Streams-Kinesis-Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## Schritt 1: Erstellen einer DynamoDB-Tabelle
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

Im ersten Schritt erstellen Sie zwei DynamoDB-Tabellen—eine Quelltabelle und eine Zieltabelle. Der `StreamViewType` des Streams der Quelltabelle lautet `NEW_IMAGE`. Das bedeutet, dass sobald ein Element in dieser Tabelle geändert wird, das Image des Elements nach der Änderung in den Stream geschrieben wird. So verfolgt der Stream alle Schreibaktivitäten der Tabelle.

Das folgende Beispiel zeigt den Code für das Erstellen von beiden Tabellen.

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## Schritt 2: Generieren von Aktualisierungsaktivitäten in der Quelltabelle
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

Im nächsten Schritt erstellen Sie einige Schreibaktivitäten in der Quelltabelle. Während diese Aktivitäten ausgeführt werden, wird der Stream der Quelltabelle nahezu in Echtzeit ebenfalls aktualisiert.

Die Anwendung definiert die Hilfsprogrammklasse mit Methoden zum Aufrufen der API-Operationen `PutItem`, `UpdateItem` und `DeleteItem` zum Schreiben der Daten. Das folgende Codebeispiel zeigt, wie diese Methoden verwendet werden.

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## Schritt 3: Verarbeiten des Streams
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

Das Programm beginnt mit der Verarbeitung des Streams. Der DynamoDB Streams Kinesis Adapter fungiert als transparente Ebene zwischen der KCL und dem DynamoDB Streams-Endpunkt, sodass der Code die KCL vollständig nutzen kann, statt DynamoDB-Streams-Low-Level-Aufrufe tätigen zu müssen. Das Programm führt die folgenden Aufgaben durch:
+ Es definiert eine Datensatzprozessor-Klasse, `StreamsRecordProcessor`, mit Methoden, die mit der KCL-Schnittstellendefinition übereinstimmen: `initialize`, `processRecords` und `shutdown`. Die `processRecords`-Methode enthält die Logik, die zum Lesen von der Quelltabelle des Streams und zum Schreiben in die Zieltabelle erforderlich ist.
+ Sie definiert eine ClassFactory für die Datensatzprozessor-Klasse (`StreamsRecordProcessorFactory`). Dies ist für Java-Programme, die die KCL verwenden, erforderlich.
+ Die Methode instanziiert eine neue KCL `Worker`, die der Class Factory zugeordnet ist.
+ Sie fährt `Worker` herunter, wenn die Datensatzverarbeitung abgeschlossen ist.

Aktivieren Sie optional den Aufholmodus in Ihrer Streams-KCL-Adapterkonfiguration, um die GetRecords API-Aufrufrate automatisch um das Dreifache zu skalieren (Standard), wenn die Verzögerung der Stream-Verarbeitung eine Minute überschreitet (Standard), sodass Ihr Stream-Consumer hohe Durchsatzspitzen in Ihrer Tabelle bewältigen kann.

Weitere Informationen zur KCL-Schnittstellendefinition finden Sie unter [Entwickeln von Konsumenten mithilfe der Kinesis-Clientbibliothek](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) im *Amazon-Kinesis-Data-Streams-Entwicklerhandbuch*. 

Das folgende Codebeispiel zeigt die Hauptschleife in `StreamsRecordProcessor`. Die `case`-Anweisung bestimmt, welche Aktion basierend auf dem `OperationType`, der im Stream-Datensatz erscheint, durchgeführt werden soll.

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## Schritt 4: Sicherstellen, dass beide Tabellen über identische Inhalte verfügen
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

An diesem Punkt sind die Inhalte der Quell- und Zieltabellen synchronisiert. Die Anwendung gibt `Scan`-Anforderungen für beide Tabellen aus, um sicherzustellen, dass ihre Inhalte identisch sind.

Die `DemoHelper`-Klasse enthält eine `ScanTable`-Methode, die die `Scan`-Low-Level-API aufruft. Das Verfahren wird im folgenden Beispiel beschrieben.

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## Schritt 5: Bereinigen
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

Die Demonstration ist abgeschlossen, so dass die Anwendung Quell- und Zieltabellen löscht. Beachten Sie hierzu das folgende Codebeispiel. Nachdem die Tabellen gelöscht wurden, bleiben die Streams für bis zu 24 Stunden verfügbar. Anschließend werden sie automatisch gelöscht.

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```

# Vollständiges Programm: DynamoDB-Streams-Kinesis-Adapter
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram"></a>

Hier finden Sie das vollständige Java-Programm, das die in [Walkthrough: DynamoDB-Streams-Kinesis-Adapter](Streams.KCLAdapter.Walkthrough.md) beschriebenen Aufgaben durchführt. Wenn Sie das Programm ausführen, wird eine Ausgabe ähnlich der folgenden angezeigt:

```
Creating table KCL-Demo-src
Creating table KCL-Demo-dest
Table is active.
Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601
Starting worker...
Scan result is equal.
Done.
```

**Wichtig**  
 Um dieses Programm auszuführen, stellen Sie sicher, dass die Client-Anwendung CloudWatch mithilfe von Richtlinien Zugriff auf DynamoDB und Amazon hat. Weitere Informationen finden Sie unter [Identitätsbasierte Richtlinien für DynamoDB](security_iam_service-with-iam.md#security_iam_service-with-iam-id-based-policies). 

Der Quellcode besteht aus vier `.java` Dateien. Um dieses Programm zu erstellen, fügen Sie die folgende Abhängigkeit hinzu, zu der die Amazon Kinesis Client Library (KCL) 3.x und das AWS SDK for Java v2 als transitive Abhängigkeiten gehören:

------
#### [ Maven ]

```
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
    <version>2.1.0</version>
</dependency>
```

------
#### [ Gradle ]

```
implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:2.1.0'
```

------

Die Quelldateien sind:
+ `StreamsAdapterDemo.java`
+ `StreamsRecordProcessor.java`
+ `StreamsRecordProcessorFactory.java`
+ `StreamsAdapterDemoHelper.java`

## StreamsAdapterDemo.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemo"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;

public class StreamsAdapterDemo {

    private static DynamoDbAsyncClient dynamoDbAsyncClient;
    private static CloudWatchAsyncClient cloudWatchAsyncClient;
    private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient;

    private static String tablePrefix = "KCL-Demo";
    private static String streamArn;

    private static Region region = Region.US_EAST_1;
    private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();

    public static void main( String[] args ) throws Exception {
        System.out.println("Starting demo...");
        dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        cloudWatchAsyncClient = CloudWatchAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region);

        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";

        setUpTables();

        StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn,
                InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));

        ShardRecordProcessorFactory shardRecordProcessorFactory =
                new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable);

        ConfigsBuilder configsBuilder = new ConfigsBuilder(
                streamTracker,
                "streams-adapter-demo",
                amazonDynamoDbStreamsAdapterClient,
                dynamoDbAsyncClient,
                cloudWatchAsyncClient,
                "streams-demo-worker",
                shardRecordProcessorFactory
        );

        DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient);
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        retrievalConfig.retrievalSpecificConfig(pollingConfig);

        System.out.println("Creating scheduler for stream " + streamArn);
        Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                retrievalConfig,
                amazonDynamoDbStreamsAdapterClient
        );

        System.out.println("Starting scheduler...");
        Thread t = new Thread(scheduler);
        t.start();

        Thread.sleep(250000);

        System.out.println("Stopping scheduler...");
        scheduler.shutdown();
        t.join();

        if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items()
                .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) {
            System.out.println("Scan result is equal.");
        } else {
            System.out.println("Tables are different!");
        }

        System.out.println("Done.");
        cleanupAndExit(0);
    }

    private static void setUpTables() {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable);
        StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable);

        awaitTableCreation(srcTable);

        performOps(srcTable);
    }

    private static void awaitTableCreation(String tableName) {
        Integer retries = 0;
        Boolean created = false;
        while (!created && retries < 100) {
            DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName);
            created = result.table().tableStatusAsString().equals("ACTIVE");
            if (created) {
                System.out.println("Table is active.");
                return;
            } else {
                retries++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
        }
        System.out.println("Timeout after table creation. Exiting...");
        cleanupAndExit(1);
    }

    private static void performOps(String tableName) {
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101");
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102");
    }

    private static void cleanupAndExit(Integer returnValue) {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build());
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build());
        System.exit(returnValue);
    }
}
```

## StreamsRecordProcessor.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessor"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    private Integer checkpointCounter;

    private final DynamoDbAsyncClient dynamoDbAsyncClient;
    private final String tableName;

    public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) {
        this.dynamoDbAsyncClient = dynamoDbAsyncClient;
        this.tableName = tableName;
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.checkpointCounter = 0;
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) {
        for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) {
            String data = new String(record.data().array(), StandardCharsets.UTF_8);
            System.out.println(data);
            Record streamRecord = record.getRecord();

            switch (streamRecord.eventName()) {
                case INSERT:
                case MODIFY:
                    StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().newImage());
                case REMOVE:
                    StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().keys().get("Id").n());
            }
            checkpointCounter += 1;
            if (checkpointCounter % 10 == 0) {
                try {
                    dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        System.out.println("Lease Lost");
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }
}
```

## StreamsRecordProcessorFactory.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessorFactory"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory {
    private final String tableName;
    private final DynamoDbAsyncClient dynamoDbAsyncClient;

    public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) {
        this.tableName = tableName;
        this.dynamoDbAsyncClient = asyncClient;
    }

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName);
    }
}
```

## StreamsAdapterDemoHelper.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemoHelper"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamsAdapterDemoHelper {

    /**
     * @return StreamArn
     */
    public static String createTable(DynamoDbAsyncClient client, String tableName) {
        List<AttributeDefinition> attributeDefinitions = new ArrayList<>();
        attributeDefinitions.add(AttributeDefinition.builder()
                .attributeName("Id")
                .attributeType("N")
                .build());

        List<KeySchemaElement> keySchema = new ArrayList<>();
        keySchema.add(KeySchemaElement.builder()
                .attributeName("Id")
                .keyType(KeyType.HASH) // Partition key
                .build());

        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType(StreamViewType.NEW_IMAGE)
                .build();

        CreateTableRequest createTableRequest = CreateTableRequest.builder()
                .tableName(tableName)
                .attributeDefinitions(attributeDefinitions)
                .keySchema(keySchema)
                .billingMode(BillingMode.PAY_PER_REQUEST)
                .streamSpecification(streamSpecification)
                .build();

        try {
            System.out.println("Creating table " + tableName);
            CreateTableResponse result = client.createTable(createTableRequest).join();
            return result.tableDescription().latestStreamArn();
        } catch (Exception e) {
            if (e.getCause() instanceof ResourceInUseException) {
                System.out.println("Table already exists.");
                return describeTable(client, tableName).table().latestStreamArn();
            }
            throw e;
        }
    }

    public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) {
        return client.describeTable(DescribeTableRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) {
        return dynamoDbClient.scan(ScanRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put("Id", AttributeValue.builder().n(id).build());
        item.put("attribute-1", AttributeValue.builder().s(val).build());

        putItem(dynamoDbClient, tableName, item);
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName,
                               Map<String, AttributeValue> items) {
        PutItemRequest putItemRequest = PutItemRequest.builder()
                .tableName(tableName)
                .item(items)
                .build();
        dynamoDbClient.putItem(putItemRequest).join();
    }

    public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        Map<String, String> expressionAttributeNames = new HashMap<>();
        expressionAttributeNames.put("#attr2", "attribute-2");

        Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
        expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build());

        UpdateItemRequest updateItemRequest = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .updateExpression("SET #attr2 = :val")
                .expressionAttributeNames(expressionAttributeNames)
                .expressionAttributeValues(expressionAttributeValues)
                .build();

        dynamoDbClient.updateItem(updateItemRequest).join();
    }

    public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .build();
        dynamoDbClient.deleteItem(deleteItemRequest).join();
    }
}
```