

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Ändern Sie die Datenerfassung für DynamoDB Streams
<a name="Streams"></a>

 DynamoDB Streams erfasst eine zeitlich geordnete Abfolge von Änderungen auf Elementebene in jeder beliebigen DynamoDB-Tabelle und speichert diese Informationen bis zu 24 Stunden. Anwendungen können auf dieses Protokoll zugreifen und die Datenelemente vor und nach der Änderung nahezu in Echtzeit aufrufen.

 Die Verschlüsselung ruhender Daten verschlüsselt die Daten in DynamoDB Streams. Weitere Informationen finden Sie unter [Ruhende DynamoDB-Verschlüsselung](EncryptionAtRest.md).

Ein *DynamoDB-Stream* ist ein strukturierter Informationsfluss zu Elementänderungen in einer DynamoDB-Tabelle. Wenn Sie den Stream für eine Tabelle aktivieren, werden von DynamoDB Informationen über jede Änderung an den Datenelementen in der Tabelle erfasst.

Wenn eine Anwendung Elemente in der Tabelle erstellt, aktualisiert oder löscht, schreibt DynamoDB Streams einen Stream-Datensatz mit dem bzw. den Primärschlüsselattributen der Elemente, die geändert wurden. Ein *Stream-Datensatz* enthält Informationen über eine Datenänderung an einem einzelnen Element einer DynamoDB-Tabelle. Sie können den Stream konfigurieren, sodass die Stream-Datensätze zusätzliche Informationen erfassen, z. B. Images der geänderten Elemente vor und nach der Änderung.

Mit DynamoDB Streams wird Folgendes sichergestellt:
+ Jeder Stream-Datensatz erscheint genau einmal im Stream.
+ Für jedes Element, das in einer DynamoDB-Tabelle geändert wird, erscheinen die Stream-Datensätze in der gleichen Reihenfolge wie die tatsächlichen Änderungen des Elements.

DynamoDB Streams schreibt Stream-Datensätze nahezu in Echtzeit, sodass Sie Anwendungen erstellen können, die diese Streams verbrauchen und basierend auf den Inhalten Aktionen einleiten.

**Topics**
+ [

## Endpunkte für DynamoDB Streams
](#Streams.Endpoints)
+ [

## Aktivieren eines Streams
](#Streams.Enabling)
+ [

## Lesen und Verarbeiten eines Streams
](#Streams.Processing)
+ [

# DynamoDB Streams und Time to Live (TTL)
](time-to-live-ttl-streams.md)
+ [

# Verwenden des DynamoDB-Streams-Kinesis-Adapters zum Verarbeiten von Stream-Datensätzen
](Streams.KCLAdapter.md)
+ [

# DynamoDB-Streams-Low-Level-API: Java-Beispiel
](Streams.LowLevel.Walkthrough.md)
+ [

# DynamoDB Streams und -Trigger AWS Lambda
](Streams.Lambda.md)
+ [

# DynamoDB Streams und Apache Flink
](StreamsApacheFlink.xml.md)

## Endpunkte für DynamoDB Streams
<a name="Streams.Endpoints"></a>

AWS verwaltet separate Endpunkte für DynamoDB- und DynamoDB Streams. Für die Arbeit mit Datenbanktabellen und Indexen muss Ihre Anwendung auf einen DynamoDB-Endpunkt zugreifen. Um DynamoDB-Streams-Datensätze zu lesen und zu verarbeiten, muss die Anwendung auf einen DynamoDB-Streams-Endpunkt in derselben Region zugreifen.

DynamoDB Streams bietet zwei Gruppen von Endpunkten. Diese sind:
+ **IPv4-only Endpoints: Endpoints** mit der Benennungskonvention. `streams.dynamodb.<region>.amazonaws.com`
+ **Dual-Stack-Endpunkte**: Neue Endpunkte, die sowohl mit als auch kompatibel sind IPv4 und IPv6 der Namenskonvention folgen. `streams-dynamodb.<region>.api.aws`

**Anmerkung**  
Eine vollständige Liste der DynamoDB- und DynamoDB-Streams-Regionen und Endpunkte finden Sie unter [Regionen und Endpunkte](https://docs.aws.amazon.com/general/latest/gr/rande.html) in der *Allgemeine AWS-Referenz*.

 AWS SDKs Sie bieten separate Clients für DynamoDB- und DynamoDB Streams. Je nach Anforderung kann die Anwendung auf einen DynamoDB-Endpunkt, einen DynamoDB-Streams-Endpunkt oder beide gleichzeitig zugreifen. Für die Verbindung mit beiden Endpunkten muss Ihre Anwendung zwei Clients instanziieren, und zwar einen für DynamoDB und einen für DynamoDB Streams.

## Aktivieren eines Streams
<a name="Streams.Enabling"></a>

Sie können einen Stream in einer neuen Tabelle aktivieren, wenn Sie ihn mit der AWS CLI oder einer der folgenden Optionen erstellen. AWS SDKs Außerdem können Sie einen Stream in einer vorhandenen Tabelle aktivieren oder deaktivieren oder die Einstellungen eines Streams ändern. DynamoDB Streams wird asynchron betrieben. Daher wirkt es sich nicht auf die Leistung der Tabelle aus, wenn Sie einen Stream aktivieren.

Die einfachste Möglichkeit zum Verwalten von DynamoDB Streams bietet die AWS-Managementkonsole.

1. Melden Sie sich bei der an AWS-Managementkonsole und öffnen Sie die DynamoDB-Konsole unter. [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)

1. Wählen Sie im Dashboard der DynamoDB-Konsole die Option **Tables** (Tabellen) aus und wählen Sie eine vorhandene Tabelle aus.

1. Wählen Sie die Registerkarte **Exports and streams** (Exporte und Streams).

1. Wählen Sie im Abschnitt **Details zum DynamoDB-Stream** die Option **Einschalten** aus.

1. Wählen Sie auf der Seite **DynamoDB-Stream einschalten** die Informationen aus, die in den Stream geschrieben werden sollen, sobald Daten in der Tabelle geändert werden:
   + **Nur Schlüsselattribute** – nur die Schlüsselattribute des geänderten Elements.
   + **Neues Image** – das gesamte Element wie es nach der Änderung erscheint.
   + **Altes Image** – das gesamte Element wie es vor der Änderung erscheint.
   + **Neues und altes Image** – sowohl das neue als auch das alte Image des Elements.

   Wenn Sie die gewünschten Einstellungen vorgenommen haben, wählen Sie **Stream einschalten** aus.

1. (Optional) Um einen vorhandenen Stream zu deaktivieren, wählen Sie unter **Details zum DynamoDB-Stream** die Option **Deaktivieren** aus.

Sie können auch die `CreateTable`- oder `UpdateTable`-API-Operationen zum Aktivieren oder Ändern eines Streams verwenden. Der Parameter `StreamSpecification` bestimmt, wie der Stream konfiguriert wird:
+ `StreamEnabled` – gibt an, ob ein Stream für die Tabelle aktiviert (`true`) oder deaktiviert (`false`) ist.
+ `StreamViewType` – legt die Informationen fest, die in den Stream geschrieben werden, sobald Daten in der Tabelle geändert werden:
  + `KEYS_ONLY` – nur die Schlüsselattribute des geänderten Elements.
  + `NEW_IMAGE` – das gesamte Element, wie es nach der Änderung erscheint.
  + `OLD_IMAGE` – das gesamte Element, wie es vor der Änderung erscheint.
  + `NEW_AND_OLD_IMAGES` – sowohl das neue als auch das alte Image des Elements.

Sie können einen Stream jederzeit aktivieren oder deaktivieren. Wenn Sie versuchen, einen Stream für eine Tabelle zu aktivieren, die bereits über einen Stream verfügt, erhalten Sie jedoch eine `ValidationException`. Wenn Sie versuchen, einen Stream für eine Tabelle zu deaktivieren, die nicht über einen Stream verfügt, erhalten Sie auch eine `ValidationException`.

Wenn Sie `StreamEnabled` auf `true` festlegen, erstellt DynamoDB einen neuen Stream mit einem zugewiesenen, eindeutigen Stream-Deskriptor. Wenn Sie einen Stream in der Tabelle deaktivieren und anschließend erneut aktivieren, wird ein neuer Stream mit einem anderen Stream-Deskriptor erstellt.

Jeder Stream wird anhand eines Amazon-Ressourcennamens (ARN) eindeutig identifiziert. Nachfolgend ist ein Beispiel-ARN für einen Stream in einer DynamoDB-Tabelle mit dem Namen `TestTable` aufgeführt:

```
arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291
```

Um den aktuellen Stream-Deskriptor für eine Tabelle zu bestimmen, erstellen Sie eine DynamoDB-Anforderung `DescribeTable` und suchen das Element `LatestStreamArn` in der Antwort.

**Anmerkung**  
Es ist nicht möglich, ein `StreamViewType` zu bearbeiten, sobald ein Stream eingerichtet wurde. Wenn Sie nach der Einrichtung Änderungen an einem Stream vornehmen möchten, müssen Sie den aktuellen Stream deaktivieren und einen neuen erstellen.

## Lesen und Verarbeiten eines Streams
<a name="Streams.Processing"></a>

Zum Lesen und Verarbeiten eines Streams muss Ihre Anwendung eine Verbindung mit einem DynamoDB-Streams-Endpunkt herstellen und API-Anforderungen ausgeben.

Ein Stream besteht aus *Stream-Datensätzen*. Jeder Stream-Datensatz stellt eine einzelne Datenänderung in der DynamoDB-Tabelle dar, zu der der Stream gehört. Jedem Stream-Datensatz ist eine Sequenznummer zugewiesen, wodurch die Reihenfolge dargestellt wird, in der der Datensatz im Stream veröffentlicht wurde.

Stream-Datensätze werden in Gruppen oder *Shards* verwaltet. Jeder Shard fungiert als Container für mehrere Stream-Datensätze und enthält Informationen, die zum Abrufen und Durchlaufen dieser Datensätze erforderlich sind. Die Stream-Datensätze in einem Shard werden nach 24 Stunden automatisch entfernt.

Shards sind flüchtig, d. h., sie werden nach Bedarf automatisch erstellt und gelöscht. Jeder Shard kann in mehrere neue Shards unterteilt werden. Dieser Vorgang erfolgt automatisch. (Es ist auch möglich, dass ein übergeordneter Shard nur einen untergeordneten Shard besitzt.) Ein Shard kann aufgrund hoher Schreibaktivitäten in der übergeordneten Tabelle aufgeteilt werden, sodass Anwendungen Datensätze aus mehreren Shards parallel verarbeiten können.

Wenn Sie einen Stream deaktivieren, werden alle offenen Shards geschlossen. Die Daten im Stream bleiben 24 Stunden lang lesbar.

Da Shards hierarchisch (über- und untergeordnet) aufgebaut sind, müssen Anwendungen einen übergeordneten Shard immer vor einem untergeordneten Shard verarbeiten. So wird sichergestellt, dass die Stream-Datensätze ebenfalls in der richtigen Reihenfolge verarbeitet werden. (Wenn Sie den DynamoDB-Streams-Kinesis-Adapter verwenden, wird dies für Sie erledigt. Ihre Anwendung verarbeitet die Shards und Streamdatensätze in der richtigen Reihenfolge. Es verarbeitet automatisch neue oder abgelaufene Shards sowie Shards, die sich während der Ausführung der Anwendung teilen. Weitere Informationen finden Sie unter [Verwenden des DynamoDB-Streams-Kinesis-Adapters zum Verarbeiten von Stream-Datensätzen](Streams.KCLAdapter.md).)

Das folgende Diagramm zeigt die Beziehung zwischen einem Stream, Shards im Stream und Stream-Datensätzen in den Shards.

![\[Struktur von DynamoDB Streams. Stream-Datensätze, die Datenänderungen darstellen, sind in Shards organisiert.\]](http://docs.aws.amazon.com/de_de/amazondynamodb/latest/developerguide/images/streams-terminology.png)


**Anmerkung**  
Wenn Sie eine `PutItem` oder `UpdateItem` Operation ausführen, mit der keine Daten in einem Element geändert werden, schreibt DynamoDB Streams *keinen* Stream-Datensatz für diese Operation.

Um auf einen Stream zuzugreifen und die darin enthaltenen Stream-Datensätze zu verarbeiten, führen Sie die folgenden Schritte aus:
+ Ermitteln Sie den eindeutigen ARN des Streams, auf den Sie zugreifen möchten.
+ Bestimmen Sie, welcher bzw. welche Shards im Stream die gewünschten Stream-Datensätze enthalten.
+ Greifen Sie auf den bzw. die Shards zu und rufen Sie die gewünschten Stream-Datensätze ab.

**Anmerkung**  
Es sollten nicht mehr als zwei Prozesse gleichzeitig aus dem Shard desselben Streams lesen. Wenn mehr als zwei Leser pro Shard vorhanden sind, kann eine Drosselung die Folge sein.

Die DynamoDB Streams-API bietet die folgenden Aktionen zur Verwendung durch Anwendungsprogramme:
+  `[ListStreams](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_ListStreams.html)` – Gibt eine Liste der Stream-Deskriptoren für das aktuelle Konto und den Endpunkt zurück. Sie können optional nur die Stream-Deskriptoren für einen bestimmten Tabellennamen anfordern.
+ `[DescribeStream](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_DescribeStream.html)` – Gibt Informationen über einen Stream zurück, z. B. den aktuellen Status des Streams, seinen Amazon-Ressourcennamen (ARN), die Zusammenstellung der Shards und die zugehörige DynamoDB-Tabelle. Sie können das Feld `ShardFilter` optional verwenden, um den vorhandenen untergeordneten Shard abzurufen, der dem übergeordneten Shard zugeordnet ist.
+ `[GetShardIterator](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html)` – Gibt einen *Shard Iterator* zurück, der eine Position innerhalb eines Shards beschreibt. Sie können anfordern, dass der Iterator Zugriff auf den ältesten Punkt, den neuesten Punkt oder einen bestimmten Punkt im Stream bereitstellt.
+ `[GetRecords](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html)` – Gibt die Stream-Datensätze innerhalb eines bestimmten Shards zurück. Sie müssen den von einer `GetShardIterator`-Anforderung zurückgegebenen Shard Iterator angeben.

Eine detaillierte Beschreibung dieser API-Aktionen, einschließlich Anforderungs- und Antwortbeispielen, finden Sie unter [Amazon-DynamoDB-Streams-API-Referenz](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Operations_Amazon_DynamoDB_Streams.html).

### Shard-Erkennung
<a name="Streams.ShardDiscovery"></a>



Mit zwei leistungsstarken Methoden können Sie neue Shards in Ihrem DynamoDB-Stream erkennen. Als Nutzer von Amazon DynamoDB Streams haben Sie zwei effektive Möglichkeiten, neue Shards zu verfolgen und zu identifizieren:

**Abfragen der gesamten Stream-Topologie**  
Verwenden Sie die `DescribeStream`-API, um den Stream regelmäßig abzufragen. Dadurch werden alle Shards im Stream zurückgegeben, einschließlich aller neu erstellten Shards. Indem Sie die Ergebnisse im Zeitverlauf vergleichen, können Sie neu hinzugefügte Shards erkennen.

**Erkennen untergeordneter Shards**  
Verwenden Sie die `DescribeStream`-API mit dem Parameter `ShardFilter`, um eine Teilmenge von Shards zu finden. Wenn Sie in der Anfrage einen übergeordneten Shard benennen, gibt DynamoDB Streams seine unmittelbar untergeordneten Shards zurück. Dieser Ansatz ist nützlich, wenn Sie nur die Herkunft der Shards verfolgen müssen, ohne den gesamten Stream zu scannen.   
Anwendungen, die Daten aus DynamoDB Streams aufnehmen, können mithilfe dieses `ShardFilter`-Parameters effizient vom Lesen eines geschlossenen Shards zum zugehörigen untergeordneten Shard übergehen. Dadurch werden wiederholte Aufrufe der `DescribeStream`-API vermieden, um die Shard-Zuordnung für alle geschlossenen und offenen Shards abzurufen und zu durchlaufen. So können untergeordnete Shards schnell erkannt werden, nachdem ein übergeordneter Shard geschlossen wurde. Auf diese Weise werden Stream-Verarbeitungsanwendungen reaktionsschneller und kostengünstiger.

Mit beiden Methoden können Sie den Überblick über die sich entwickelnde Struktur Ihrer DynamoDB Streams behalten, um keine wichtigen Datenaktualisierungen oder Shard-Änderungen zu verpassen.

### Datenaufbewahrungsfrist für DynamoDB Streams
<a name="Streams.DataRetention"></a>

Alle Daten in DynamoDB Streams unterliegen einer 24-Stunden-Nutzungsdauer. Sie können die Aktivitäten der letzten 24 Stunden für eine bestimmte Tabelle abrufen und analysieren. Daten, die älter als 24 Stunden sind, können jedoch jederzeit entfernt werden.

Wenn Sie einen Stream in einer Tabelle deaktivieren, bleiben die Daten im Stream 24 Stunden lang lesbar. Nach Ablauf dieses Zeitraums verfallen die Daten und die Stream-Datensätze werden automatisch gelöscht. Es gibt keinen Mechanismus zum manuellen Löschen eines vorhandenen Streams. Sie müssen nur warten, bis die Aufbewahrungsfrist abgelaufen ist (24 Stunden) und alle Stream-Datensätze gelöscht werden.

# DynamoDB Streams und Time to Live (TTL)
<a name="time-to-live-ttl-streams"></a>

Sie können Elemente, die durch die [Gültigkeitsdauer](TTL.md) (TTL) gelöscht wurden, sichern oder anderweitig verarbeiten, indem Sie Amazon DynamoDB Streams in der Tabelle aktivieren und die Streams-Datensätze der abgelaufenen Elemente verarbeiten. Weitere Informationen finden Sie unter [Lesen und Verarbeiten eines Streams](Streams.md#Streams.Processing).

Der Stream-Datensatz enthält ein Benutzeridentitätsfeld `Records[<index>].userIdentity`.

Die Elemente, die nach Ablauf durch den Gültigkeitsdauer-Prozess gelöscht wurden, haben die folgenden Felder:
+ `Records[<index>].userIdentity.type`

  `"Service"`
+ `Records[<index>].userIdentity.principalId`

  `"dynamodb.amazonaws.com"`

**Anmerkung**  
Wenn Sie TTL in einer globalen Tabelle verwenden, wird das Feld `userIdentity` für die Region, in der die TTL ausgeführt wurde, festgelegt. Dieses Feld wird in anderen Regionen nicht festgelegt, wenn der Löschvorgang repliziert wird.

Der folgende JSON-Ausdruck zeigt den relevanten Teil eines einzelnen Stream-Datensatzes.

```
"Records": [
    {
        ...

        "userIdentity": {
            "type": "Service",
            "principalId": "dynamodb.amazonaws.com"
        }

        ...

    }
]
```

## Verwenden von DynamoDB Streams und Lambda zum Archivieren von TTL-gelöschten Elementen
<a name="streams-archive-ttl-deleted-items"></a>

Die Kombination von [DynamoDB Time to Live (TTL)](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/TTL.html), [DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) und [AWS -Lambda](https://aws.amazon.com/lambda/) kann dazu beitragen, die Archivierung von Daten zu vereinfachen, die DynamoDB-Speicherkosten zu senken und die Codekomplexität zu reduzieren. Die Verwendung von Lambda als Stream-Konsument bietet viele Vorteile, insbesondere geringere Kosten im Vergleich zu anderen Konsumenten wie der Kinesis Client Library (KCL). `GetRecords`-API-Aufrufe in Ihrem DynamoDB-Stream werden Ihnen nicht in Rechnung gestellt, wenn Sie Lambda zum Verarbeiten von Ereignissen verwenden, und Lambda kann eine Ereignisfilterung bieten, indem es JSON-Muster in einem Stream-Ereignis identifiziert. Bei der Inhaltsfilterung nach Ereignismustern können Sie bis zu fünf verschiedene Filter definieren, um zu steuern, welche Ereignisse zur Verarbeitung an Lambda gesendet werden. Dies hilft, die Zahl der Aufrufe Ihrer Lambda-Funktionen zu reduzieren, den Code zu vereinfachen und die Gesamtkosten zu senken.

Zwar enthält DynamoDB Streams alle Datenänderungen, beispielsweise `Create`-, `Modify`- und `Remove`-Aktionen, dies kann jedoch zu unerwünschten Aufrufen Ihrer Lambda-Archivfunktion führen. Angenommen, Sie verfügen über eine Tabelle mit 2 Millionen Datenänderungen pro Stunde, die in den Stream fließen. Allerdings handelt es sich bei weniger als 5 Prozent dieser Änderungen um Elementlöschungen, die während des TTL-Prozesses ablaufen und archiviert werden müssen. Bei Verwendung von [Lambda-Ereignisquellenfiltern](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html) wird die Lambda-Funktion nur 100 000-mal pro Stunde aufgerufen. Infolge der Ereignisfilterung werden Ihnen anstelle der 2 Millionen Aufrufe, die Sie ohne Ereignisfilterung hätten, nur die erforderlichen Aufrufe in Rechnung gestellt.

Die Ereignisfilterung wird auf die [Lambda-Ereignisquellenzuweisung](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html), angewendet. Hierbei handelt es sich um eine Ressource, die aus einem ausgewählten Ereignis – dem DynamoDB-Stream – Daten liest und eine Lambda-Funktion aufruft. Im folgenden Diagramm ist zu sehen, wie ein durch Time to Live gelöschtes Element von einer Lambda-Funktion unter Verwendung von Streams und Ereignisfiltern verarbeitet wird.

![\[Ein durch den TTL-Prozess gelöschtes Element startet eine Lambda-Funktion, die Streams und Ereignisfilter verwendet.\]](http://docs.aws.amazon.com/de_de/amazondynamodb/latest/developerguide/images/streams-lambda-ttl.png)


### Ereignisfiltermuster für DynamoDB Time to Live
<a name="ttl-event-filter-pattern"></a>

Durch Hinzufügen der folgenden JSON zu den [Filterkriterien](https://docs.aws.amazon.com/lambda/latest/dg/API_FilterCriteria.html) für Ihre Ereignisquellenzuweisung kann Ihre Lambda-Funktion nur für TTL-gelöschte Elemente aufgerufen werden:

```
{
    "Filters": [
        {
            "Pattern": { "userIdentity": { "type": ["Service"], "principalId": ["dynamodb.amazonaws.com"] } }
        }
    ]
}
```

### AWS Lambda Erstellen Sie eine Zuordnung der Ereignisquellen
<a name="create-event-source-mapping"></a>

Verwenden Sie die folgenden Codeausschnitte, um eine gefilterte Ereignisquellenzuweisung zu erstellen, die Sie mit dem DynamoDB-Stream einer Tabelle verbinden können. Jeder Codeblock enthält das Ereignisfiltermuster.

------
#### [ AWS CLI ]

```
aws lambda create-event-source-mapping \
--event-source-arn 'arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000' \
--batch-size 10 \
--enabled \
--function-name test_func \
--starting-position LATEST \
--filter-criteria '{"Filters": [{"Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"}]}'
```

------
#### [ Java ]

```
LambdaClient client = LambdaClient.builder()
        .region(Region.EU_WEST_1)
        .build();

Filter userIdentity = Filter.builder()
        .pattern("{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}")
        .build();

FilterCriteria filterCriteria = FilterCriteria.builder()
        .filters(userIdentity)
        .build();

CreateEventSourceMappingRequest mappingRequest = CreateEventSourceMappingRequest.builder()
        .eventSourceArn("arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000")
        .batchSize(10)
        .enabled(Boolean.TRUE)
        .functionName("test_func")
        .startingPosition("LATEST")
        .filterCriteria(filterCriteria)
        .build();

try{
    CreateEventSourceMappingResponse eventSourceMappingResponse = client.createEventSourceMapping(mappingRequest);
    System.out.println("The mapping ARN is "+eventSourceMappingResponse.eventSourceArn());

}catch (ServiceException e){
    System.out.println(e.getMessage());
}
```

------
#### [ Node ]

```
const client = new LambdaClient({ region: "eu-west-1" });

const input = {
    EventSourceArn: "arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000",
    BatchSize: 10,
    Enabled: true,
    FunctionName: "test_func",
    StartingPosition: "LATEST",
    FilterCriteria: { "Filters": [{ "Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}" }] }
}

const command = new CreateEventSourceMappingCommand(input);

try {
    const results = await client.send(command);
    console.log(results);
} catch (err) {
    console.error(err);
}
```

------
#### [ Python ]

```
session = boto3.session.Session(region_name = 'eu-west-1')
client = session.client('lambda')

try:
    response = client.create_event_source_mapping(
        EventSourceArn='arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000',
        BatchSize=10,
        Enabled=True,
        FunctionName='test_func',
        StartingPosition='LATEST',
        FilterCriteria={
            'Filters': [
                {
                    'Pattern': "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"
                },
            ]
        }
    )
    print(response)
except Exception as e:
    print(e)
```

------
#### [ JSON ]

```
{
  "userIdentity": {
     "type": ["Service"],
     "principalId": ["dynamodb.amazonaws.com"]
   }
}
```

------

# Verwenden des DynamoDB-Streams-Kinesis-Adapters zum Verarbeiten von Stream-Datensätzen
<a name="Streams.KCLAdapter"></a>

Die Verwendung des Amazon-Kinesis-Adapters ist die empfohlene Methode zum Verwenden von Streams aus Amazon DynamoDB. Die DynamoDB-Streams-API ist absichtlich ähnlich gestaltet wie die von Kinesis Data Streams. In beiden Services bestehen die Daten-Streams aus Shards. Dieses sind Container für Stream-Datensätze. Beide Dienste APIs enthalten`ListStreams`, `DescribeStream``GetShards`, und `GetShardIterator` Operationen. (Diese DynamoDB-Streams-Aktionen ähneln ihren Gegenstücken in Kinesis Data Streams, sind jedoch nicht vollkommen identisch.)

Als DynamoDB-Streams-Benutzer können Sie das Entwurfsmuster in der KCL zum Verarbeiten von DynamoDB-Streams-Shards und Stream-Datensätzen verwenden. Verwenden Sie dazu den DynamoDB-Streams-Kinesis-Adapter. Der Kinesis-Adapter implementiert die Kinesis-Data-Streams-Schnittstelle so, dass die KCL zum Verwenden und Verarbeiten von Datensätzen aus DynamoDB Streams eingesetzt werden kann. [Anweisungen zur Einrichtung und Installation des DynamoDB Streams Kinesis Adapters finden Sie im Repository. GitHub](https://github.com/awslabs/dynamodb-streams-kinesis-adapter)

Sie können Anwendungen für Kinesis Data Streams mit der Kinesis Client Library (KCL) schreiben. Die KCL vereinfacht die Codierung durch Bereitstellen nützlicher Abstraktionen oberhalb der Low-Level-Kinesis-Data-Streams-API. Weitere Informationen zur KCL finden Sie im [Entwickeln von Konsumenten mit der Kinesis-Client-Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) im *Entwicklerhandbuch zu Amazon Kinesis Data Streams*.

DynamoDB empfiehlt die Verwendung von KCL Version 3.x mit AWS SDK for Java v2.x. [Die aktuelle Version 1.x des DynamoDB Streams Kinesis Adapters mit AWS SDK für AWS SDK für Java v1.x wird während des gesamten Lebenszyklus weiterhin vollständig unterstützt, wie dies während der Übergangsphase gemäß den Wartungsrichtlinien für Tools vorgesehen war.AWS SDKs ](https://docs.aws.amazon.com/sdkref/latest/guide/maint-policy.html)

**Anmerkung**  
Die Versionen 1.x und 2.x der Amazon Kinesis Client Library (KCL) sind veraltet. KCL 1.x wird am 30. Januar 2026 verfügbar sein. end-of-support Wir empfehlen dringend, KCL-Anwendungen, die Version 1.x verwenden, vor dem 30. Januar 2026 zur neuesten KCL-Version zu migrieren. Die neueste KCL-Version finden Sie auf der Seite [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) unter. GitHub Weitere Informationen zur aktuellen Versionen finden Sie unter [Use Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/kcl.html). Weitere Informationen zur Migration von KCL 1.x zu KCL 3.x finden Sie unter „Migrating from KCL 1.x to KCL 3.x“.

Das folgende Diagramm zeigt, wie diese Bibliotheken miteinander interagieren.

![\[Interaktion zwischen DynamoDB Streams, Kinesis Data Streams und KCL zur Verarbeitung von DynamoDB-Streams-Datensätzen.\]](http://docs.aws.amazon.com/de_de/amazondynamodb/latest/developerguide/images/streams-kinesis-adapter.png)


Wenn Sie den DynamoDB-Streams-Kinesis-Adapter eingerichtet haben, können Sie die Entwicklung mit der KCL-Schnittstelle starten, wobei die API-Aufrufe nahtlos an den DynamoDB-Streams-Endpunkt gerichtet werden.

Beim Start der Anwendung wird die KCL aufgerufen, einen Worker zu instanziieren. Sie müssen dem Worker Konfigurationsinformationen für die Anwendung, wie z. B. den Stream-Deskriptor und die AWS Anmeldeinformationen, sowie den Namen einer von Ihnen angegebenen Datensatzprozessorklasse zur Verfügung stellen. Da der Code im Datensatzprozessor ausgeführt wird, erledigt der Worker die folgenden Aufgaben:
+ Stellt eine Verbindung mit dem Stream her
+ Listet die Shards innerhalb des Streams auf
+ Überprüft und listet untergeordnete Shards eines geschlossenen übergeordneten Shards innerhalb des Streams auf
+ Koordiniert Shard-Zuordnungen mit anderen Auftragnehmern (wenn vorhanden)
+ Instanziiert einen Datensatzverarbeiter für jeden Shard, der verwaltet wird
+ Ruft Datensätze aus dem Stream per Pull ab
+ Skaliert die GetRecords API-Aufrufrate bei hohem Durchsatz (wenn der Nachholmodus konfiguriert ist)
+ Überträgt per Push Datensätze an den entsprechenden Datensatzverarbeiter
+ Verwendet Checkpoints für verarbeitete Datensätze
+ Gleicht Shard-Auftragnehmer-Zuordnungen aus, wenn die Auftragnehmer-Instance Änderungen zählt
+ Gleicht Shard-Worker-Zuordnungen aus, wenn Shards aufgeteilt werden

Der KCL-Adapter unterstützt den Catch-up-Modus, eine Funktion zur automatischen Anpassung der Anrufrate zur Bewältigung vorübergehender Durchsatzerhöhungen. Wenn die Verzögerung bei der Stream-Verarbeitung einen konfigurierbaren Schwellenwert (standardmäßig eine Minute) überschreitet, skaliert der Aufholmodus die GetRecords API-Aufruffrequenz um einen konfigurierbaren Wert (Standard 3x), um Datensätze schneller abzurufen, und kehrt dann zum Normalzustand zurück, sobald die Verzögerung nachlässt. Dies ist in Zeiten mit hohem Durchsatz nützlich, in denen DynamoDB-Schreibaktivitäten die Verbraucher bei Verwendung der Standardabfrageraten überfordern können. Der Nachholmodus kann über den Konfigurationsparameter aktiviert werden (Standardeinstellung falsch). `catchupEnabled`

**Anmerkung**  
Eine Beschreibung der hier aufgeführten KCL-Konzepte finden Sie unter [Entwickeln von Konsumenten mithilfe der Kinesis-Clientbibliothek](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) im *Amazon-Kinesis-Data-Streams-Entwicklerhandbuch*.  
Weitere Informationen zur Verwendung von Streams mit AWS Lambda [DynamoDB Streams und -Trigger AWS Lambda](Streams.Lambda.md)

# Migrieren von KCL 1.x zu KCL 3.x
<a name="streams-migrating-kcl"></a>

## -Übersicht
<a name="migrating-kcl-overview"></a>

In dieser Anleitung wird erläutert, wie Sie Ihre Verbraucheranwendung von KCL 1.x zu KCL 3.x migrieren. Aufgrund der unterschiedlichen Architektur von KCL 1.x und KCL 3.x müssen für die Migration mehrere Komponenten aktualisiert werden, um die Kompatibilität sicherzustellen.

KCL 1.x verwendet im Vergleich zu KCL 3.x andere Klassen und Schnittstellen. Sie müssen zuerst die Klassen „Datensatzprozessor“, „Datensatzprozessor-Factory“ und „Worker“ in das KCL 3.x-kompatible Format migrieren und dann die Schritte für die Migration von KCL 1.x zu KCL 3.x ausführen.

## Schritte zur Migration
<a name="migration-steps"></a>

**Topics**
+ [

### Schritt 1: Migrieren des Datensatzprozessors
](#step1-record-processor)
+ [

### Schritt 2: Migrieren der Datensatzprozessor-Factory
](#step2-record-processor-factory)
+ [

### Schritt 3: Migrieren des Workers
](#step3-worker-migration)
+ [

### Schritt 4: Überblick über die Konfiguration und Empfehlungen für KCL 3.x
](#step4-configuration-migration)
+ [

### Schritt 5: Migrieren von KCL 2.x zu KCL 3.x
](#step5-kcl2-to-kcl3)

### Schritt 1: Migrieren des Datensatzprozessors
<a name="step1-record-processor"></a>

Das folgende Beispiel zeigt einen Datensatzprozessor, der für die Version KCL 1.x des DynamoDB-Streams-Kinesis-Adapters implementiert wurde:

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**Um die RecordProcessor Klasse zu migrieren**

1. Ändern Sie die Schnittstellen von `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` und `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` folgendermaßen zu `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor`:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. Aktualisieren Sie die Importanweisungen für die Methoden `initialize` und `processRecords`:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. Ersetzen Sie die Methode `shutdownRequested` durch die folgenden neuen Methoden: `leaseLost`, `shardEnded` und `shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

Nachstehend finden Sie die aktualisierte Version der Datensatzprozessorklasse:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**Anmerkung**  
DynamoDB Streams Kinesis Adapter verwendet SDKv2 jetzt das Record-Modell. In SDKv2 geben komplexe `AttributeValue` Objekte (`BS`,,, `NS` `M``L`,`SS`) niemals Null zurück. Verwenden Sie die Methoden `hasBs()`, `hasNs()`, `hasM()`, `hasL()`, `hasSs()`, um zu überprüfen, ob diese Werte existieren.

### Schritt 2: Migrieren der Datensatzprozessor-Factory
<a name="step2-record-processor-factory"></a>

Die Datensatzprozessor-Factory ist für das Erstellen von Prozessoren verantwortlich, wenn eine Lease erworben wird. Nachfolgend sehen Sie ein Beispiel für eine KCL-1.x-Factory:

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**So migrieren Sie die `RecordProcessorFactory`**
+ Ändern Sie die implementierte Schnittstelle von `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` folgendermaßen zu `software.amazon.kinesis.processor.ShardRecordProcessorFactory`:

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

Nachfolgend sehen Sie ein Beispiel für die Datensatzprozessor-Factory in 3.0:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### Schritt 3: Migrieren des Workers
<a name="step3-worker-migration"></a>

In Version 3.0 der KCL wird die **Worker**-Klasse durch eine neue Klasse namens **Scheduler** ersetzt. Nachfolgend sehen Sie ein Beispiel für einen KCL-1.x-Worker.

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**So migrieren Sie den Worker**

1. Ändern Sie die `import`-Anweisung für die `Worker`-Klasse in die Import-Anweisungen für die Klassen `Scheduler` und `ConfigsBuilder`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Importieren Sie `StreamTracker` und ändern Sie den Import von `StreamsWorkerFactory` zu `StreamsSchedulerFactory`.

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. Wählen Sie die Position, von der aus die Anwendung gestartet werden soll. Möglich sind `TRIM_HORIZON` oder `LATEST`.

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. Erstellen Sie eine `StreamTracker`-Instance.

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. Erstellen Sie das Objekt `AmazonDynamoDBStreamsAdapterClient`.

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. Erstellen Sie das Objekt `ConfigsBuilder`.

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. Erstellen Sie den `Scheduler` mithilfe von `ConfigsBuilder`, wie im folgenden Beispiel gezeigt:

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**Wichtig**  
Die Einstellung `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` gewährleistet die Kompatibilität zwischen dem DynamoDB-Streams-Kinesis-Adapter für KCL v3 und KCL v1, nicht zwischen KCL v2 und v3.

### Schritt 4: Überblick über die Konfiguration und Empfehlungen für KCL 3.x
<a name="step4-configuration-migration"></a>

Eine ausführliche Beschreibung der nach KCL 1.x eingeführten Konfigurationen, die für KCL 3.x relevant sind, finden Sie unter [KCL Configurations](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html) und [KCL Migration Client Configuration](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration).

**Wichtig**  
Anstatt Objekte von `checkpointConfig`, `coordinatorConfig`, `leaseManagementConfig`, `metricsConfig`, `processorConfig` und `retrievalConfig` direkt zu erstellen, empfehlen wir, `ConfigsBuilder` zu verwenden, um Konfigurationen in KCL 3.x und aktuelleren Versionen einzustellen. Das verhindert Probleme mit der Scheduler-Initialisierung. `ConfigsBuilder` bietet eine flexiblere und wartungsfreundlichere Methode zur Konfiguration Ihrer KCL-Anwendung.

#### Konfigurationen mit dem Standardwert für Aktualisierungen in KCL 3.x
<a name="kcl3-configuration-overview"></a>

`billingMode`  
In der KCL-Version 1.x ist der Standardwert für `billingMode` auf `PROVISIONED` eingestellt. Bei der KCL-Version 3.x ist die Standardeinstellung für `billingMode` jedoch `PAY_PER_REQUEST` (On-Demand-Modus). Wir empfehlen Ihnen, den On-Demand-Kapazitätsmodus für Ihre Leasetabelle zu verwenden, um die Kapazität automatisch an die Nutzung anzupassen. Anleitungen zur Verwendung der bereitgestellten Kapazität für Ihre Leasetabellen finden Sie unter [Best practices for the lease table with provisioned capacity mode](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html).

`idleTimeBetweenReadsInMillis`  
In der KCL-Version 1.x ist der Standardwert für `idleTimeBetweenReadsInMillis` auf 1 000 (oder 1 Sekunde) eingestellt. Die KCL-Version 3.x legt den Standardwert für `dleTimeBetweenReadsInMillis` auf 1 500 (oder 1,5 Sekunden) fest. Der Amazon-DynamoDB-Streams-Kinesis-Adapter überschreibt den Standardwert jedoch mit 1 000 (oder 1 Sekunde).

#### Neue Konfigurationen in KCL 3.x
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
Diese Konfiguration definiert das Zeitintervall, bis die Verarbeitung neu erkannter Shards beginnt. Es wird nach der Formel 1,5 × `leaseAssignmentIntervalMillis` berechnet. Wenn diese Einstellung nicht explizit konfiguriert ist, beträgt das Zeitintervall standardmäßig 1,5 × `failoverTimeMillis`. Die Verarbeitung neuer Shards beinhaltet das Scannen der Leasetabelle und das Abfragen eines globalen sekundären Index (GSI) in der Leasetabelle. Eine Absenkung des `leaseAssignmentIntervalMillis`-Werts erhöht die Häufigkeit dieser Scan- und Abfragevorgänge, was zu höheren DynamoDB-Kosten führt. Wir empfehlen, diesen Wert auf 2 000 (oder 2 Sekunden) einzustellen, um die Verzögerung bei der Verarbeitung neuer Shards zu minimieren.

`shardConsumerDispatchPollIntervalMillis`  
Diese Konfiguration definiert das Intervall zwischen aufeinanderfolgenden Abfragen durch den Shard-Verbraucher, um Zustandsübergänge auszulösen. In KCL-Version 1.x wurde dieses Verhalten durch den Parameter `idleTimeInMillis` gesteuert, der nicht als konfigurierbare Einstellung verfügbar war. Bei KCL-Version 3.x empfehlen wir, diese Konfiguration auf den Wert einzustellen, der in Ihrer KCL-Version 1.x für ` idleTimeInMillis` verwendet wurde.

### Schritt 5: Migrieren von KCL 2.x zu KCL 3.x
<a name="step5-kcl2-to-kcl3"></a>

Um für einen reibungslosen Übergang und Kompatibilität mit der neuesten Version der Kinesis Client Library (KCL) zu sorgen, folgen Sie den Schritten 5–8 der Anleitung für das [Upgrade von KCL 2.x auf KCL 3.x](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics) im Migrationsleitfaden.

Informationen zur Fehlerbehebung in KCL 3.x finden Sie unter [Troubleshooting KCL consumer applications](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html).

# Rollback zu einer früheren KCL-Version
<a name="kcl-migration-rollback"></a>

In diesem Abschnitt wird erläutert, wie Sie Ihre Verbraucheranwendung auf die vorherige KCL-Version zurücksetzen können. Der Rollback-Prozess besteht aus zwei Schritten:

1. Führen Sie das [KCL Migration Tool](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) aus.

1. Stellen Sie den Code der vorherigen KCL-Version erneut bereit.

## Schritt 1: Ausführen des KCL Migration Tools
<a name="kcl-migration-rollback-step1"></a>

Wenn Sie zur vorherigen KCL-Version zurückkehren möchten, müssen Sie das KCL Migration Tool ausführen. Das Tool erfüllt zwei wichtige Aufgaben:
+ Es entfernt die Metadatentabelle für Worker-Metriken und den globalen sekundären Index aus der Leasetabelle in DynamoDB. Diese Artefakte werden von KCL 3.x erstellt, werden aber nicht benötigt, wenn Sie zur vorherigen Version zurückkehren.
+ Dadurch werden alle Worker in einem mit KCL 1.x kompatiblen Modus ausgeführt und verwenden wieder den Load-Balancing-Algorithmus aus früheren KCL-Versionen. Eventuelle Probleme mit dem neuen Load-Balancing-Algorithmus in KCL 3.x werden dadurch sofort behoben.

**Wichtig**  
Die Koordinatorstatustabelle in DynamoDB muss vorhanden sein und darf während des Migrations-, Rollback- und Rollforward-Prozesses nicht gelöscht werden.

**Anmerkung**  
Es ist wichtig, dass alle Worker in Ihrer Verbraucheranwendung zu einem bestimmten Zeitpunkt denselben Load-Balancing-Algorithmus verwenden. Das KCL Migration Tool sorgt dafür, dass alle Worker in Ihrer KCL-3.x-Verbraucheranwendung in den KCL-1.x-kompatiblen Modus wechseln. So führen alle Worker während des Rollbacks zur vorherigen KCL-Version denselben Load-Balancing-Algorithmus aus.

Sie können das [KCL-Migrationstool](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) im Skriptverzeichnis des [ GitHubKCL-Repositorys](https://github.com/awslabs/amazon-kinesis-client/tree/master) herunterladen. Führen Sie das Skript über einen Worker oder Host aus, der berechtigt ist, in die Koordinatorstatustabelle, die Tabelle mit den Worker-Metriken und die Leasetabelle zu schreiben. Stellen Sie sicher, dass die entsprechenden [IAM-Berechtigungen](https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html) für KCL-Verbraucheranwendungen konfiguriert sind. Führen Sie das Skript mit dem folgenden Befehl nur einmal pro KCL-Anwendung aus:

```
python3 ./KclMigrationTool.py --region region --mode rollback [--application_name applicationName] [--lease_table_name leaseTableName] [--coordinator_state_table_name coordinatorStateTableName] [--worker_metrics_table_name workerMetricsTableName]
```

### Parameters
<a name="kcl-migration-rollback-parameters"></a>

`--region`  
Ersetze es durch *region* dein. AWS-Region

`--application_name`  
Dieser Parameter ist erforderlich, wenn Sie Standardnamen für Ihre DynamoDB-Metadatentabellen (Leasetabelle, Koordinatorstatustabelle und Worker-Metriktabelle) verwenden. Wenn Sie benutzerdefinierte Namen für diese Tabellen festgelegt haben, können Sie diesen Parameter weglassen. *applicationName*Ersetzen Sie es durch Ihren tatsächlichen KCL-Anwendungsnamen. Das Tool verwendet diesen Namen, um die Standardtabellennamen abzuleiten, wenn keine benutzerdefinierten Namen angegeben werden.

`--lease_table_name`  
Dieser Parameter wird benötigt, wenn Sie in Ihrer KCL-Konfiguration einen benutzerdefinierten Namen für die Leasetabelle festgelegt haben. Wenn Sie den Standardtabellennamen verwenden, können Sie diesen Parameter weglassen. *leaseTableName*Ersetzen Sie durch den benutzerdefinierten Tabellennamen, den Sie für Ihre Leasingtabelle angegeben haben.

`--coordinator_state_table_name`  
Dieser Parameter wird benötigt, wenn Sie in Ihrer KCL-Konfiguration einen benutzerdefinierten Namen für die Koordinatorstatustabelle festgelegt haben. Wenn Sie den Standardtabellennamen verwenden, können Sie diesen Parameter weglassen. *coordinatorStateTableName*Ersetzen Sie es durch den benutzerdefinierten Tabellennamen, den Sie für Ihre Koordinatorstatentabelle angegeben haben.

`--worker_metrics_table_name`  
Dieser Parameter wird benötigt, wenn Sie in Ihrer KCL-Konfiguration einen benutzerdefinierten Namen für die Worker-Metriktabelle festgelegt haben. Wenn Sie den Standardtabellennamen verwenden, können Sie diesen Parameter weglassen. *workerMetricsTableName*Ersetzen Sie es durch den Namen der benutzerdefinierten Tabelle, den Sie für Ihre Tabelle mit Arbeitskennzahlen angegeben haben.

## Schritt 2: Erneutes Bereitstellen des Codes mit der vorherigen KCL-Version
<a name="kcl-migration-rollback-step2"></a>

**Wichtig**  
Jede Erwähnung von Version 2.x in der vom KCL Migration Tool generierten Ausgabe sollte als Bezugnahme auf KCL-Version 1.x interpretiert werden. Durch die Ausführung des Skripts erfolgt kein vollständiges Rollback. Es wird lediglich der Load-Balancing-Algorithmus auf den in KCL Version 1.x verwendeten umgestellt.

Nachdem das KCL Migration Tool den Rollback ausgeführt hat, wird eine der folgenden Meldungen angezeigt:

Nachricht 1  
„Rollback completed. Your application was running 2x compatible functionality. Please rollback to your previous application binaries by deploying the code with your previous KCL version.“  
**Erforderliche Maßnahme:** Dies bedeutet, dass Ihre Worker im KCL-1.x-kompatiblen Modus ausgeführt wurden. Stellen Sie den Code für Ihre Worker erneut mit der vorherigen KCL-Version bereit.

Nachricht 2  
„Rollback completed. Your KCL Application was running 3x functionality and will rollback to 2x compatible functionality. If you don't see mitigation after a short period of time, please rollback to your previous application binaries by deploying the code with your previous KCL version.“  
**Erforderliche Maßnahme:** Dies bedeutet, dass Ihre Worker im KCL-3.x-kompatiblen Modus ausgeführt wurden und das KCL Migration Tool alle Worker auf den KCL-1.x-kompatiblen Modus zurückgesetzt hat. Stellen Sie den Code für Ihre Worker erneut mit der vorherigen KCL-Version bereit.

Nachricht 3  
„Application was already rolled back. Alle KCLv3 Ressourcen, die gelöscht werden könnten, wurden gelöscht, um Gebühren zu vermeiden, bis die Anwendung im Rahmen der Migration weiterentwickelt werden kann.“  
**Erforderliche Maßnahme:** Dies bedeutet, dass Ihre Worker bereits in den KCL-1.x-kompatiblen Modus zurückversetzt wurden. Stellen Sie den Code für Ihre Worker erneut mit der vorherigen KCL-Version bereit.

# Rollforward zu KCL 3.x nach einem Rollback
<a name="kcl-migration-rollforward"></a>

In diesem Abschnitt wird erläutert, wie Sie Ihre Verbraucheranwendung nach einem Rollback wieder auf KCL 3.x umstellen können. Wenn Sie einen Rollforward durchführen müssen, sind zwei Schritte erforderlich:

1. Führen Sie das [KCL Migration Tool](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) aus.

1. Stellen Sie den Code mit KCL 3.x bereit.

## Schritt 1: Ausführen des KCL Migration Tools
<a name="kcl-migration-rollforward-step1"></a>

Führen Sie das KCL Migration Tool mit dem folgenden Befehl aus, um KCL 3.x wiederherzustellen:

```
python3 ./KclMigrationTool.py --region region --mode rollforward [--application_name applicationName] [--coordinator_state_table_name coordinatorStateTableName]
```

### Parameters
<a name="kcl-migration-rollforward-parameters"></a>

`--region`  
Ersetze es *region* durch dein AWS-Region.

`--application_name`  
Dieser Parameter ist erforderlich, wenn Sie Standardnamen für die Koordinatorstatustabelle verwenden. Wenn Sie benutzerdefinierte Namen für diese Tabelle festgelegt haben, können Sie diesen Parameter weglassen. *applicationName*Ersetzen Sie es durch Ihren tatsächlichen KCL-Anwendungsnamen. Das Tool verwendet diesen Namen, um die Standardtabellennamen abzuleiten, wenn keine benutzerdefinierten Namen angegeben werden.

`--coordinator_state_table_name`  
Dieser Parameter wird benötigt, wenn Sie in Ihrer KCL-Konfiguration einen benutzerdefinierten Namen für die Koordinatorstatustabelle festgelegt haben. Wenn Sie den Standardtabellennamen verwenden, können Sie diesen Parameter weglassen. *coordinatorStateTableName*Ersetzen Sie durch den benutzerdefinierten Tabellennamen, den Sie für Ihre Koordinatorstatustabelle angegeben haben.

Nachdem Sie das Migrationstool im Roll-Forward-Modus ausgeführt haben, erstellt KCL die folgenden DynamoDB-Ressourcen, die für KCL 3.x erforderlich sind:
+ einen globalen sekundären Index für die Leasetabelle
+ eine Tabelle mit Worker-Metriken

## Schritt 2: Bereitstellen des Codes mit KCL 3.x
<a name="kcl-migration-rollforward-step2"></a>

Nachdem Sie den Rollforward mit dem KCL Migration Tool ausgeführt haben, stellen Sie Ihren Code mit KCL 3.x für Ihre Worker bereit. Informationen zum Abschließen Ihrer Migration finden Sie unter [Schritt 8: Abschließen der Migration](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-finish).

# Walkthrough: DynamoDB-Streams-Kinesis-Adapter
<a name="Streams.KCLAdapter.Walkthrough"></a>

In diesem Abschnitt wird eine Anleitung für eine Java-Anwendung gegeben, die die Amazon-Kinesis-Client-Library und den Amazon-DynamoDB-Streams-Kinesis-Adapter verwendet. Die Anwendung zeigt ein Beispiel für die Datenreplikation, wobei Schreibaktivitäten einer Tabelle auf eine zweite Tabelle angewendet werden und die Inhalte beider Tabellen synchron bleiben. Sie finden den Quellcode unter [Vollständiges Programm: DynamoDB-Streams-Kinesis-Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md).

Das Programm führt Folgendes aus:

1. Erstellt zwei DynamoDB-Tabellen namens `KCL-Demo-src` und `KCL-Demo-dst`. Für jede dieser Tabellen ist ein Stream aktiviert.

1. Generiert Aktualisierungsaktivitäten in der Quelltabelle durch Hinzufügen, Aktualisieren und Löschen von Elementen. Dies bewirkt, dass Daten in den Tabellenstream geschrieben werden.

1. Liest die Datensätze aus dem Stream, rekonstruiert diese als DynamoDB-Anforderungen und wendet die Anforderungen auf die Zieltabelle an.

1. Scannt die Quell- und Zieltabellen, um sicherzustellen, dass ihre Inhalte identisch sind.

1. Bereinigt die Daten durch Löschen der Tabellen.

Diese Schritte werden in den folgenden Abschnitten beschrieben und die vollständige Anwendung wird am Ende der Anleitung angezeigt.

**Topics**
+ [

## Schritt 1: Erstellen einer DynamoDB-Tabelle
](#Streams.KCLAdapter.Walkthrough.Step1)
+ [

## Schritt 2: Generieren von Aktualisierungsaktivitäten in der Quelltabelle
](#Streams.KCLAdapter.Walkthrough.Step2)
+ [

## Schritt 3: Verarbeiten des Streams
](#Streams.KCLAdapter.Walkthrough.Step3)
+ [

## Schritt 4: Sicherstellen, dass beide Tabellen über identische Inhalte verfügen
](#Streams.KCLAdapter.Walkthrough.Step4)
+ [

## Schritt 5: Bereinigen
](#Streams.KCLAdapter.Walkthrough.Step5)
+ [

# Vollständiges Programm: DynamoDB-Streams-Kinesis-Adapter
](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## Schritt 1: Erstellen einer DynamoDB-Tabelle
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

Im ersten Schritt erstellen Sie zwei DynamoDB-Tabellen—eine Quelltabelle und eine Zieltabelle. Der `StreamViewType` des Streams der Quelltabelle lautet `NEW_IMAGE`. Das bedeutet, dass sobald ein Element in dieser Tabelle geändert wird, das Image des Elements nach der Änderung in den Stream geschrieben wird. So verfolgt der Stream alle Schreibaktivitäten der Tabelle.

Das folgende Beispiel zeigt den Code für das Erstellen von beiden Tabellen.

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## Schritt 2: Generieren von Aktualisierungsaktivitäten in der Quelltabelle
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

Im nächsten Schritt erstellen Sie einige Schreibaktivitäten in der Quelltabelle. Während diese Aktivitäten ausgeführt werden, wird der Stream der Quelltabelle nahezu in Echtzeit ebenfalls aktualisiert.

Die Anwendung definiert die Hilfsprogrammklasse mit Methoden zum Aufrufen der API-Operationen `PutItem`, `UpdateItem` und `DeleteItem` zum Schreiben der Daten. Das folgende Codebeispiel zeigt, wie diese Methoden verwendet werden.

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## Schritt 3: Verarbeiten des Streams
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

Das Programm beginnt mit der Verarbeitung des Streams. Der DynamoDB Streams Kinesis Adapter fungiert als transparente Ebene zwischen der KCL und dem DynamoDB Streams-Endpunkt, sodass der Code die KCL vollständig nutzen kann, statt DynamoDB-Streams-Low-Level-Aufrufe tätigen zu müssen. Das Programm führt die folgenden Aufgaben durch:
+ Es definiert eine Datensatzprozessor-Klasse, `StreamsRecordProcessor`, mit Methoden, die mit der KCL-Schnittstellendefinition übereinstimmen: `initialize`, `processRecords` und `shutdown`. Die `processRecords`-Methode enthält die Logik, die zum Lesen von der Quelltabelle des Streams und zum Schreiben in die Zieltabelle erforderlich ist.
+ Sie definiert eine ClassFactory für die Datensatzprozessor-Klasse (`StreamsRecordProcessorFactory`). Dies ist für Java-Programme, die die KCL verwenden, erforderlich.
+ Die Methode instanziiert eine neue KCL `Worker`, die der Class Factory zugeordnet ist.
+ Sie fährt `Worker` herunter, wenn die Datensatzverarbeitung abgeschlossen ist.

Aktivieren Sie optional den Aufholmodus in Ihrer Streams-KCL-Adapterkonfiguration, um die GetRecords API-Aufrufrate automatisch um das Dreifache zu skalieren (Standard), wenn die Verzögerung der Stream-Verarbeitung eine Minute überschreitet (Standard), sodass Ihr Stream-Consumer hohe Durchsatzspitzen in Ihrer Tabelle bewältigen kann.

Weitere Informationen zur KCL-Schnittstellendefinition finden Sie unter [Entwickeln von Konsumenten mithilfe der Kinesis-Clientbibliothek](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) im *Amazon-Kinesis-Data-Streams-Entwicklerhandbuch*. 

Das folgende Codebeispiel zeigt die Hauptschleife in `StreamsRecordProcessor`. Die `case`-Anweisung bestimmt, welche Aktion basierend auf dem `OperationType`, der im Stream-Datensatz erscheint, durchgeführt werden soll.

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## Schritt 4: Sicherstellen, dass beide Tabellen über identische Inhalte verfügen
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

An diesem Punkt sind die Inhalte der Quell- und Zieltabellen synchronisiert. Die Anwendung gibt `Scan`-Anforderungen für beide Tabellen aus, um sicherzustellen, dass ihre Inhalte identisch sind.

Die `DemoHelper`-Klasse enthält eine `ScanTable`-Methode, die die `Scan`-Low-Level-API aufruft. Das Verfahren wird im folgenden Beispiel beschrieben.

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## Schritt 5: Bereinigen
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

Die Demonstration ist abgeschlossen, so dass die Anwendung Quell- und Zieltabellen löscht. Beachten Sie hierzu das folgende Codebeispiel. Nachdem die Tabellen gelöscht wurden, bleiben die Streams für bis zu 24 Stunden verfügbar. Anschließend werden sie automatisch gelöscht.

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```

# Vollständiges Programm: DynamoDB-Streams-Kinesis-Adapter
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram"></a>

Hier finden Sie das vollständige Java-Programm, das die in [Walkthrough: DynamoDB-Streams-Kinesis-Adapter](Streams.KCLAdapter.Walkthrough.md) beschriebenen Aufgaben durchführt. Wenn Sie das Programm ausführen, wird eine Ausgabe ähnlich der folgenden angezeigt:

```
Creating table KCL-Demo-src
Creating table KCL-Demo-dest
Table is active.
Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601
Starting worker...
Scan result is equal.
Done.
```

**Wichtig**  
 Um dieses Programm auszuführen, stellen Sie sicher, dass die Client-Anwendung CloudWatch mithilfe von Richtlinien Zugriff auf DynamoDB und Amazon hat. Weitere Informationen finden Sie unter [Identitätsbasierte Richtlinien für DynamoDB](security_iam_service-with-iam.md#security_iam_service-with-iam-id-based-policies). 

Der Quellcode besteht aus vier `.java` Dateien. Um dieses Programm zu erstellen, fügen Sie die folgende Abhängigkeit hinzu, zu der die Amazon Kinesis Client Library (KCL) 3.x und das AWS SDK for Java v2 als transitive Abhängigkeiten gehören:

------
#### [ Maven ]

```
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
    <version>2.1.0</version>
</dependency>
```

------
#### [ Gradle ]

```
implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:2.1.0'
```

------

Die Quelldateien sind:
+ `StreamsAdapterDemo.java`
+ `StreamsRecordProcessor.java`
+ `StreamsRecordProcessorFactory.java`
+ `StreamsAdapterDemoHelper.java`

## StreamsAdapterDemo.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemo"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;

public class StreamsAdapterDemo {

    private static DynamoDbAsyncClient dynamoDbAsyncClient;
    private static CloudWatchAsyncClient cloudWatchAsyncClient;
    private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient;

    private static String tablePrefix = "KCL-Demo";
    private static String streamArn;

    private static Region region = Region.US_EAST_1;
    private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();

    public static void main( String[] args ) throws Exception {
        System.out.println("Starting demo...");
        dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        cloudWatchAsyncClient = CloudWatchAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region);

        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";

        setUpTables();

        StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn,
                InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));

        ShardRecordProcessorFactory shardRecordProcessorFactory =
                new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable);

        ConfigsBuilder configsBuilder = new ConfigsBuilder(
                streamTracker,
                "streams-adapter-demo",
                amazonDynamoDbStreamsAdapterClient,
                dynamoDbAsyncClient,
                cloudWatchAsyncClient,
                "streams-demo-worker",
                shardRecordProcessorFactory
        );

        DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient);
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        retrievalConfig.retrievalSpecificConfig(pollingConfig);

        System.out.println("Creating scheduler for stream " + streamArn);
        Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                retrievalConfig,
                amazonDynamoDbStreamsAdapterClient
        );

        System.out.println("Starting scheduler...");
        Thread t = new Thread(scheduler);
        t.start();

        Thread.sleep(250000);

        System.out.println("Stopping scheduler...");
        scheduler.shutdown();
        t.join();

        if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items()
                .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) {
            System.out.println("Scan result is equal.");
        } else {
            System.out.println("Tables are different!");
        }

        System.out.println("Done.");
        cleanupAndExit(0);
    }

    private static void setUpTables() {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable);
        StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable);

        awaitTableCreation(srcTable);

        performOps(srcTable);
    }

    private static void awaitTableCreation(String tableName) {
        Integer retries = 0;
        Boolean created = false;
        while (!created && retries < 100) {
            DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName);
            created = result.table().tableStatusAsString().equals("ACTIVE");
            if (created) {
                System.out.println("Table is active.");
                return;
            } else {
                retries++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
        }
        System.out.println("Timeout after table creation. Exiting...");
        cleanupAndExit(1);
    }

    private static void performOps(String tableName) {
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101");
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102");
    }

    private static void cleanupAndExit(Integer returnValue) {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build());
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build());
        System.exit(returnValue);
    }
}
```

## StreamsRecordProcessor.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessor"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    private Integer checkpointCounter;

    private final DynamoDbAsyncClient dynamoDbAsyncClient;
    private final String tableName;

    public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) {
        this.dynamoDbAsyncClient = dynamoDbAsyncClient;
        this.tableName = tableName;
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.checkpointCounter = 0;
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) {
        for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) {
            String data = new String(record.data().array(), StandardCharsets.UTF_8);
            System.out.println(data);
            Record streamRecord = record.getRecord();

            switch (streamRecord.eventName()) {
                case INSERT:
                case MODIFY:
                    StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().newImage());
                case REMOVE:
                    StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().keys().get("Id").n());
            }
            checkpointCounter += 1;
            if (checkpointCounter % 10 == 0) {
                try {
                    dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        System.out.println("Lease Lost");
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }
}
```

## StreamsRecordProcessorFactory.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessorFactory"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory {
    private final String tableName;
    private final DynamoDbAsyncClient dynamoDbAsyncClient;

    public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) {
        this.tableName = tableName;
        this.dynamoDbAsyncClient = asyncClient;
    }

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName);
    }
}
```

## StreamsAdapterDemoHelper.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemoHelper"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamsAdapterDemoHelper {

    /**
     * @return StreamArn
     */
    public static String createTable(DynamoDbAsyncClient client, String tableName) {
        List<AttributeDefinition> attributeDefinitions = new ArrayList<>();
        attributeDefinitions.add(AttributeDefinition.builder()
                .attributeName("Id")
                .attributeType("N")
                .build());

        List<KeySchemaElement> keySchema = new ArrayList<>();
        keySchema.add(KeySchemaElement.builder()
                .attributeName("Id")
                .keyType(KeyType.HASH) // Partition key
                .build());

        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType(StreamViewType.NEW_IMAGE)
                .build();

        CreateTableRequest createTableRequest = CreateTableRequest.builder()
                .tableName(tableName)
                .attributeDefinitions(attributeDefinitions)
                .keySchema(keySchema)
                .billingMode(BillingMode.PAY_PER_REQUEST)
                .streamSpecification(streamSpecification)
                .build();

        try {
            System.out.println("Creating table " + tableName);
            CreateTableResponse result = client.createTable(createTableRequest).join();
            return result.tableDescription().latestStreamArn();
        } catch (Exception e) {
            if (e.getCause() instanceof ResourceInUseException) {
                System.out.println("Table already exists.");
                return describeTable(client, tableName).table().latestStreamArn();
            }
            throw e;
        }
    }

    public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) {
        return client.describeTable(DescribeTableRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) {
        return dynamoDbClient.scan(ScanRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put("Id", AttributeValue.builder().n(id).build());
        item.put("attribute-1", AttributeValue.builder().s(val).build());

        putItem(dynamoDbClient, tableName, item);
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName,
                               Map<String, AttributeValue> items) {
        PutItemRequest putItemRequest = PutItemRequest.builder()
                .tableName(tableName)
                .item(items)
                .build();
        dynamoDbClient.putItem(putItemRequest).join();
    }

    public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        Map<String, String> expressionAttributeNames = new HashMap<>();
        expressionAttributeNames.put("#attr2", "attribute-2");

        Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
        expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build());

        UpdateItemRequest updateItemRequest = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .updateExpression("SET #attr2 = :val")
                .expressionAttributeNames(expressionAttributeNames)
                .expressionAttributeValues(expressionAttributeValues)
                .build();

        dynamoDbClient.updateItem(updateItemRequest).join();
    }

    public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .build();
        dynamoDbClient.deleteItem(deleteItemRequest).join();
    }
}
```

# DynamoDB-Streams-Low-Level-API: Java-Beispiel
<a name="Streams.LowLevel.Walkthrough"></a>

**Anmerkung**  
Der Code auf dieser Seite ist nicht umfassend und behandelt nicht alle Szenarien für die Nutzung von Amazon DynamoDB Streams. Die empfohlene Methode für die Nutzung von Stream-Datensätzen aus DynamoDB erfolgt anhand des Amazon-Kinesis-Adapters unter Verwendung der Kinesis-Client-Library (KCL) gemäß der Beschreibung in [Verwenden des DynamoDB-Streams-Kinesis-Adapters zum Verarbeiten von Stream-Datensätzen](Streams.KCLAdapter.md).

Dieser Abschnitt enthält ein Java-Programm, das DynamoDB Streams in Aktion zeigt. Das Programm führt Folgendes aus:

1. Erstellt eine DynamoDB-Tabelle mit einem aktivierten Stream.

1. Beschreibt die Stream-Einstellungen für diese Tabelle.

1. Ändert die Daten in der Tabelle.

1. Beschreibt die Shards im Stream.

1. Liest die Stream-Datensätze aus den Shards.

1. Ruft untergeordnete Shards ab und setzt das Lesen von Datensätzen fort.

1. Bereinigt die Daten.

Wenn Sie das Programm ausführen, wird eine Ausgabe ähnlich der folgenden angezeigt.

```
Testing Streams Demo
Creating an Amazon DynamoDB table TestTableForStreams with a simple primary key: Id
Waiting for TestTableForStreams to be created...
Current stream ARN for TestTableForStreams: arn:aws:dynamodb:us-east-2:123456789012:table/TestTableForStreams/stream/2018-03-20T16:49:55.208
Stream enabled: true
Update view type: NEW_AND_OLD_IMAGES

Performing write activities on TestTableForStreams
Processing item 1 of 100
Processing item 2 of 100
Processing item 3 of 100
...
Processing item 100 of 100
Shard: {ShardId: shardId-1234567890-...,SequenceNumberRange: {StartingSequenceNumber: 100002572486797508907,},}
    Shard iterator: EjYFEkX2a26eVTWe...
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, NewImage={Message=AttributeValue(S=New Item!), Id=AttributeValue(S=4)}, SequenceNumber=2000001584047545833909, SizeBytes=22, StreamViewType=NEW_AND_OLD_IMAGES)
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, NewImage={Message=AttributeValue(S=This is an updated item), Id=AttributeValue(S=4)}, OldImage={Message=AttributeValue(S=New Item!), Id=AttributeValue(S=4)}, SequenceNumber=2100003604869767892701, SizeBytes=55, StreamViewType=NEW_AND_OLD_IMAGES)
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, OldImage={Message=AttributeValue(S=This is an updated item), Id=AttributeValue(S=4)}, SequenceNumber=2200001099771112898434, SizeBytes=36, StreamViewType=NEW_AND_OLD_IMAGES)
...
Deleting the table...
Table StreamsDemoTable deleted.
Demo complete
```

**Example Beispiel**  

```
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import software.amazon.awssdk.core.waiters.WaiterResponse;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeAction;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.Shard;
import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
import software.amazon.awssdk.services.dynamodb.model.ShardFilterType;
import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbWaiter;

public class StreamsLowLevelDemo {


    public static void main(String[] args) {
        final String usage = "Testing Streams Demo";
        try {
            System.out.println(usage);

            String tableName = "StreamsDemoTable";
            String key = "Id";
            System.out.println("Creating an Amazon DynamoDB table " + tableName + " with a simple primary key: " + key);
            Region region = Region.US_WEST_2;
            DynamoDbClient ddb = DynamoDbClient.builder()
                    .region(region)
                    .build();

            DynamoDbStreamsClient ddbStreams = DynamoDbStreamsClient.builder()
                    .region(region)
                    .build();
            DescribeTableRequest describeTableRequest = DescribeTableRequest.builder()
                    .tableName(tableName)
                    .build();
            TableDescription tableDescription = null;
            try{
                tableDescription = ddb.describeTable(describeTableRequest).table();
            }catch (Exception e){
                System.out.println("Table " + tableName + " does not exist.");
                tableDescription = createTable(ddb, tableName, key);
            }

            // Print the stream settings for the table
            String streamArn = tableDescription.latestStreamArn();
           
            StreamSpecification streamSpec = tableDescription.streamSpecification();
            System.out.println("Current stream ARN for " + tableDescription.tableName() + ": " +
                   streamArn);
            System.out.println("Stream enabled: " + streamSpec.streamEnabled());
            System.out.println("Update view type: " + streamSpec.streamViewType());
            System.out.println();
            // Generate write activity in the table
            System.out.println("Performing write activities on " + tableName);
            int maxItemCount = 100;
            for (Integer i = 1; i <= maxItemCount; i++) {
                System.out.println("Processing item " + i + " of " + maxItemCount);
                // Write a new item
                putItemInTable(key, i, tableName, ddb);
                // Update the item
                updateItemInTable(key, i, tableName, ddb);
                // Delete the item
                deleteDynamoDBItem(key, i, tableName, ddb);
            }

            // Process Stream
            processStream(streamArn, maxItemCount, ddb, ddbStreams, tableName);

            // Delete the table
            System.out.println("Deleting the table...");
            DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder()
                    .tableName(tableName)
                    .build();
            ddb.deleteTable(deleteTableRequest);
            System.out.println("Table " + tableName + " deleted.");
            System.out.println("Demo complete");
            ddb.close();
        } catch (Exception e) {
            System.out.println("Error: " + e.getMessage());
        }
    }

    private static void processStream(String streamArn, int maxItemCount, DynamoDbClient ddb, DynamoDbStreamsClient ddbStreams, String tableName) {
        // Get all the shard IDs from the stream. Note that DescribeStream returns
        // the shard IDs one page at a time.
        String lastEvaluatedShardId = null;
        do {
            DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
                    .streamArn(streamArn)
                    .exclusiveStartShardId(lastEvaluatedShardId).build();
            DescribeStreamResponse describeStreamResponse = ddbStreams.describeStream(describeStreamRequest);

            List<Shard> shards = describeStreamResponse.streamDescription().shards();

            // Process each shard on this page

            fetchShardsAndReadRecords(streamArn, maxItemCount, ddbStreams, shards);

            // If LastEvaluatedShardId is set, then there is
            // at least one more page of shard IDs to retrieve
            lastEvaluatedShardId = describeStreamResponse.streamDescription().lastEvaluatedShardId();

        } while (lastEvaluatedShardId != null);

    }

    private static void fetchShardsAndReadRecords(String streamArn, int maxItemCount, DynamoDbStreamsClient ddbStreams, List<Shard> shards) {
        for (Shard shard : shards) {
            String shardId = shard.shardId();
            System.out.println("Shard: " + shard);

            // Get an iterator for the current shard
            GetShardIteratorRequest shardIteratorRequest = GetShardIteratorRequest.builder()
                    .streamArn(streamArn).shardId(shardId)
                    .shardIteratorType(ShardIteratorType.TRIM_HORIZON).build();

            GetShardIteratorResponse getShardIteratorResult = ddbStreams.getShardIterator(shardIteratorRequest);

            String currentShardIter = getShardIteratorResult.shardIterator();

            // Shard iterator is not null until the Shard is sealed (marked as READ_ONLY).
            // To prevent running the loop until the Shard is sealed, we process only the
            // items that were written into DynamoDB and then exit.
            int processedRecordCount = 0;
            while (currentShardIter != null && processedRecordCount < maxItemCount) {
                // Use the shard iterator to read the stream records
                GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder()
                        .shardIterator(currentShardIter).build();
                GetRecordsResponse getRecordsResult = ddbStreams.getRecords(getRecordsRequest);
                List<Record> records = getRecordsResult.records();
                for (Record record : records) {
                    System.out.println("        " + record.dynamodb());
                }
                processedRecordCount += records.size();
                currentShardIter = getRecordsResult.nextShardIterator();
            }
            if (currentShardIter == null){
                System.out.println("Shard has been fully processed. Shard iterator is null.");
                System.out.println("Fetch the child shard to continue processing instead of bulk fetching all shards");
                DescribeStreamRequest describeStreamRequestForChildShards = DescribeStreamRequest.builder()
                        .streamArn(streamArn)
                        .shardFilter(ShardFilter.builder()
                                .type(ShardFilterType.CHILD_SHARDS)
                                .shardId(shardId).build())
                        .build();
                DescribeStreamResponse describeStreamResponseChildShards = ddbStreams.describeStream(describeStreamRequestForChildShards);
                fetchShardsAndReadRecords(streamArn, maxItemCount, ddbStreams, describeStreamResponseChildShards.streamDescription().shards());
            }
        }
    }

    private static void putItemInTable(String key, Integer i, String tableName, DynamoDbClient ddb) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());
        item.put("Message", AttributeValue.builder()
                .s("New Item!")
                .build());
        PutItemRequest request = PutItemRequest.builder()
                .tableName(tableName)
                .item(item)
                .build();
        ddb.putItem(request);
    }

    private static void updateItemInTable(String key, Integer i, String tableName, DynamoDbClient ddb) {

        HashMap<String, AttributeValue> itemKey = new HashMap<>();
        itemKey.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());


        HashMap<String, AttributeValueUpdate> updatedValues = new HashMap<>();
        updatedValues.put("Message", AttributeValueUpdate.builder()
                .value(AttributeValue.builder().s("This is an updated item").build())
                .action(AttributeAction.PUT)
                .build());

        UpdateItemRequest request = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(itemKey)
                .attributeUpdates(updatedValues)
                .build();
        ddb.updateItem(request);
    }

    public static void deleteDynamoDBItem(String key, Integer i, String tableName, DynamoDbClient ddb) {
        HashMap<String, AttributeValue> keyToGet = new HashMap<>();
        keyToGet.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());

        DeleteItemRequest deleteReq = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(keyToGet)
                .build();
        ddb.deleteItem(deleteReq);
    }

    public static TableDescription createTable(DynamoDbClient ddb, String tableName, String key) {
        DynamoDbWaiter dbWaiter = ddb.waiter();
        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType("NEW_AND_OLD_IMAGES")
                .build();
        CreateTableRequest request = CreateTableRequest.builder()
                .attributeDefinitions(AttributeDefinition.builder()
                        .attributeName(key)
                        .attributeType(ScalarAttributeType.S)
                        .build())
                .keySchema(KeySchemaElement.builder()
                        .attributeName(key)
                        .keyType(KeyType.HASH)
                        .build())
                .billingMode(BillingMode.PAY_PER_REQUEST) //  DynamoDB automatically scales based on traffic.
                .tableName(tableName)
                .streamSpecification(streamSpecification)
                .build();

        TableDescription newTable;
        try {
            CreateTableResponse response = ddb.createTable(request);
            DescribeTableRequest tableRequest = DescribeTableRequest.builder()
                    .tableName(tableName)
                    .build();
                    
            System.out.println("Waiting for " + tableName + " to be created...");

            // Wait until the Amazon DynamoDB table is created.
            WaiterResponse<DescribeTableResponse> waiterResponse = dbWaiter.waitUntilTableExists(tableRequest);
            waiterResponse.matched().response().ifPresent(System.out::println);
            newTable = response.tableDescription();
            return newTable;

        } catch (DynamoDbException e) {
            System.err.println(e.getMessage());
            System.exit(1);
        }
        return null;
    }



}
```

# DynamoDB Streams und -Trigger AWS Lambda
<a name="Streams.Lambda"></a>

Amazon DynamoDB ist integriert, AWS Lambda sodass Sie *Trigger* erstellen können — Codeteile, die automatisch auf Ereignisse in DynamoDB Streams reagieren. Mit Auslösern können Sie Anwendungen erstellen, die auf Datenänderungen in DynamoDB-Tabellen reagieren.

**Topics**
+ [

# Tutorial \$11: Verwenden von Filtern zur Verarbeitung aller Ereignisse mit Amazon DynamoDB und AWS Lambda Verwendung der AWS CLI
](Streams.Lambda.Tutorial.md)
+ [

# Tutorial 2: Verwenden von Filtern, um einige Ereignisse mit DynamoDB und Lambda zu verarbeiten
](Streams.Lambda.Tutorial2.md)
+ [

# Bewährte Methoden zur Verwendung von DynamoDB Streams mit Lambda
](Streams.Lambda.BestPracticesWithDynamoDB.md)

Wenn Sie DynamoDB Streams für eine Tabelle aktivieren, können Sie den Stream Amazon Resource Name (ARN) mit einer von Ihnen geschriebenen AWS Lambda Funktion verknüpfen. Alle Mutationsaktionen für diese DynamoDB-Tabelle können dann als Element im Stream erfasst werden. Sie können beispielsweise einen Auslöser festlegen, sodass beim Ändern eines Elements in einer Tabelle sofort ein neuer Datensatz im Stream der betreffenden Tabelle angezeigt wird. 

**Anmerkung**  
Wenn Sie mehr als zwei Lambda-Funktionen für einen DynamoDB-Stream abonnieren, kann es zu einer Drosselung der Lesegeschwindigkeit kommen.

Der [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html)-Service fragt den Stream viermal pro Sekunde nach neuen Datensätzen ab. Wenn neue Stream-Datensätze verfügbar sind, wird Ihre Lambda-Funktion synchron aufgerufen. Sie können bis zu zwei Lambda-Funktionen für denselben DynamoDB-Stream abonnieren. Wenn Sie mehr als zwei Lambda-Funktionen für denselben DynamoDB-Stream abonnieren, kann es zu einer Drosselung der Lesegeschwindigkeit kommen.

Die Lambda-Funktion kann eine Benachrichtigung senden, einen Workflow einleiten oder viele andere von Ihnen festgelegte Aktionen durchführen. Sie können eine Lambda-Funktion schreiben, um jeden Stream-Datensatz einfach in persistenten Speicher wie z. B. Amazon S3 File Gateway (Amazon S3) zu kopieren und einen permanenten Prüfungs-Trail der Schreibaktivitäten in Ihrer Tabelle zu erstellen. Angenommen, Sie verfügen über eine mobile Gaming-App, die Schreibvorgänge in einer `GameScores`-Tabelle vornimmt. Bei jedem Aktualisieren des `TopScore`-Attributs der `GameScores`-Tabelle wird ein entsprechender Stream-Datensatz in den Stream der Tabelle geschrieben. Dieses Ereignis kann dann automatisch eine Lambda-Funktion auslösen, die einen Glückwunsch in einem Social-Media-Netzwerk postet. Diese Funktion könnte auch so geschrieben werden, dass sie alle Stream-Datensätze ignoriert, die kein Update von `GameScores` sind oder das `TopScore`-Attribut nicht ändern.

Wenn Ihre Funktion einen Fehler zurückgibt, wiederholt Lambda den Vorgang mit dem Batch, bis die Verarbeitung erfolgreich ist oder die Daten ablaufen. Sie können Lambda auch so konfigurieren, dass eine Wiederholung mit einem kleineren Batch erfolgt, die Anzahl der Wiederholungen eingeschränkt wird, Datensätze verworfen werden, sobald sie zu alt sind, und vieles mehr.

Gemäß bewährten Methoden für die Leistung muss die Lambda-Funktion kurzlebig sein. Um unnötige Verarbeitungsverzögerungen zu vermeiden, sollte sie auch keine komplexe Logik ausführen. Insbesondere bei einem Hochgeschwindigkeits-Stream ist es besser, asynchrone Nachbearbeitungs-Schrittfunktions-Workflows auszulösen als synchrone Lambdas mit langer Laufzeit.

 Sie können Lambda-Trigger für verschiedene AWS Konten verwenden, indem Sie eine ressourcenbasierte Richtlinie für den DynamoDB-Stream konfigurieren, um der kontoübergreifenden Lesezugriff auf die Lambda-Funktion zu gewähren. Weitere Informationen darüber, wie Sie Ihren Stream so konfigurieren, dass er kontoübergreifenden Zugriff ermöglicht, finden Sie unter [Zugriff mit kontoübergreifenden AWS Lambda-Funktionen teilen](rbac-cross-account-access.md#shared-access-cross-acount-lambda) im DynamoDB-Entwicklerhandbuch.

[Weitere Informationen AWS Lambda dazu finden Sie im Entwicklerhandbuch.AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/)

# Tutorial \$11: Verwenden von Filtern zur Verarbeitung aller Ereignisse mit Amazon DynamoDB und AWS Lambda Verwendung der AWS CLI
<a name="Streams.Lambda.Tutorial"></a>

 

In diesem Tutorial erstellen Sie einen AWS Lambda Trigger zur Verarbeitung eines Streams aus einer DynamoDB-Tabelle.

**Topics**
+ [

## Schritt 1: Erstellen einer DynamoDB-Tabelle mit einem aktivierten Stream
](#Streams.Lambda.Tutorial.CreateTable)
+ [

## Schritt 2: Erstellen einer Lambda-Ausführungsrolle
](#Streams.Lambda.Tutorial.CreateRole)
+ [

## Schritt 3: Erstellen eines Amazon-SNS-Themas
](#Streams.Lambda.Tutorial.SNSTopic)
+ [

## Schritt 4: Erstellen und Testen einer Lambda-Funktion
](#Streams.Lambda.Tutorial.LambdaFunction)
+ [

## Schritt 5: Erstellen und Testen eines Auslösers
](#Streams.Lambda.Tutorial.CreateTrigger)

Das Szenario für dieses Tutorial ist Woofer, ein einfaches soziales Netzwerk. Woofer-Benutzer kommunizieren mit einer kurzen Textnachricht *Barks* („Bellen”), die an andere Woofer-Benutzer gesendet werden. Das folgende Diagramm zeigt die Komponenten und den Workflow für diese Anwendung.

![\[Woofer-Anwendungsworkflow mit einer DynamoDB-Tabelle, einem Stream-Datensatz, einer Lambda-Funktion und einem Amazon-SNS-Thema\]](http://docs.aws.amazon.com/de_de/amazondynamodb/latest/developerguide/images/StreamsAndTriggers.png)


1. Ein Benutzer schreibt ein Element in eine DynamoDB-Tabelle (`BarkTable`). Jedes Element in der Tabelle steht für eine Textnachricht („Bark”).

1. Ein neuer Stream-Datensatz wird erstellt, um zu berücksichtigen, dass ein neues Element der `BarkTable` hinzugefügt wurde.

1. Der neue Stream-Record löst eine AWS Lambda Funktion () `publishNewBark` aus.

1. Wenn der Stream-Datensatz angibt, dass ein neues Element `BarkTable` hinzugefügt wurde, liest die Lambda-Funktion die Daten aus dem Stream-Datensatz und veröffentlicht eine Nachricht für ein Thema im Amazon Simple Notification Service (Amazon SNS).

1. Die Nachricht wird von den Abonnenten des Amazon-SNS-Themas empfangen. (In diesem Tutorial ist der einzige Abonnent eine E-Mail-Adresse.)

**Bevor Sie beginnen**  
Dieses Tutorial verwendet die AWS Command Line Interface AWS CLI. Folgen Sie den Anweisungen im [AWS Command Line Interface -Benutzerhandbuch](https://docs.aws.amazon.com/cli/latest/userguide/) zum Installieren und Konfigurieren der AWS CLI.

## Schritt 1: Erstellen einer DynamoDB-Tabelle mit einem aktivierten Stream
<a name="Streams.Lambda.Tutorial.CreateTable"></a>

In diesem Schritt erstellen Sie eine DynamoDB-Tabelle (`BarkTable`), um alle Textnachrichten von Woofer-Benutzern zu speichern. Der Primärschlüssel besteht aus `Username` (Partitionsschlüssel) und `Timestamp` (Sortierschlüssel). Bei beiden Attributen handelt es sich um Zeichenfolgen.

Für `BarkTable` ist ein Stream aktiviert. Später in diesem Tutorial erstellen Sie einen Trigger, indem Sie dem Stream eine AWS Lambda Funktion zuordnen.

1. Verwenden Sie den folgenden Befehl, um die Tabelle zu erstellen.

   ```
   aws dynamodb create-table \
       --table-name BarkTable \
       --attribute-definitions AttributeName=Username,AttributeType=S AttributeName=Timestamp,AttributeType=S \
       --key-schema AttributeName=Username,KeyType=HASH  AttributeName=Timestamp,KeyType=RANGE \
       --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
       --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
   ```

1. Suchen Sie in der Ausgabe nach dem `LatestStreamArn`.

   ```
   ...
   "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp
   ...
   ```

   Notieren Sie sich die `region` und die `accountID`, da Sie sie für die anderen Schritte in diesem Tutorial benötigen.

## Schritt 2: Erstellen einer Lambda-Ausführungsrolle
<a name="Streams.Lambda.Tutorial.CreateRole"></a>

In diesem Schritt erstellen Sie eine AWS Identity and Access Management (IAM-) Rolle (`WooferLambdaRole`) und weisen ihr Berechtigungen zu. Diese Rolle wird von der Lambda-Funktion verwendet, die Sie in [Schritt 4: Erstellen und Testen einer Lambda-Funktion](#Streams.Lambda.Tutorial.LambdaFunction) erstellen. 

Außerdem legen Sie eine Richtlinie für die Rolle fest. Die Richtlinie enthält alle Berechtigungen, die die Lambda-Funktion zur Laufzeit benötigt.

1. Erstellen Sie eine Datei mit dem Namen `trust-relationship.json` und dem folgenden Inhalt.

------
#### [ JSON ]

****  

   ```
   {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "lambda.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }
   ```

------

1. Geben Sie den folgenden Befehl ein, um `WooferLambdaRole` zu erstellen.

   ```
   aws iam create-role --role-name WooferLambdaRole \
       --path "/service-role/" \
       --assume-role-policy-document file://trust-relationship.json
   ```

1. Erstellen Sie eine Datei mit dem Namen `role-policy.json` und dem folgenden Inhalt. (Ersetzen Sie `region` und `accountID` durch Ihre AWS Region und Konto-ID.)

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Action": [
                   "logs:CreateLogGroup",
                   "logs:CreateLogStream",
                   "logs:PutLogEvents"
               ],
               "Resource": "arn:aws:logs:us-east-1:111122223333:*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "dynamodb:DescribeStream",
                   "dynamodb:GetRecords",
                   "dynamodb:GetShardIterator",
                   "dynamodb:ListStreams"
               ],
               "Resource": "arn:aws:dynamodb:us-east-1:111122223333:table/BarkTable/stream/*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "sns:Publish"
               ],
               "Resource": [
                   "*"
               ]
           }
       ]
   }
   ```

------

   Die Richtlinie enthält vier Anweisungen, damit `WooferLambdaRole` folgende Aufgaben ausführen kann:
   + Führen Sie eine Lambda-Funktion (`publishNewBark`). Die Funktion erstellen Sie zu einem späteren Zeitpunkt in diesem Tutorial.
   + Greifen Sie auf Amazon CloudWatch Logs zu. Die Lambda-Funktion schreibt zur Laufzeit Diagnosen in CloudWatch Logs.
   + Lesen von Daten aus dem DynamoDB Stream für `BarkTable`.
   + Veröffentlichen Sie Nachrichten in Amazon SNS.

1. Führen Sie den folgenden Befehl aus, um die Richtlinie `WooferLambdaRole` anzufügen:

   ```
   aws iam put-role-policy --role-name WooferLambdaRole \
       --policy-name WooferLambdaRolePolicy \
       --policy-document file://role-policy.json
   ```

## Schritt 3: Erstellen eines Amazon-SNS-Themas
<a name="Streams.Lambda.Tutorial.SNSTopic"></a>

In diesem Schritt erstellen Sie ein Amazon-SNS-Thema (`wooferTopic`) und abonnieren es für eine E-Mail-Adresse. Ihre Lambda-Funktion verwendet dieses Thema zum Veröffentlichen neuer Barks von Woofer-Benutzern.

1. Geben Sie den folgenden Befehl ein, um ein neues Amazon-SNS-Thema zu erstellen.

   ```
   aws sns create-topic --name wooferTopic
   ```

1. Geben Sie den folgenden Befehl ein, um `wooferTopic` für eine E-Mail-Adresse zu abonnieren. (Ersetzen Sie `region` bzw. `accountID` durch Ihre AWS -Region bzw. die Konto-ID und `example@example.com` durch eine gültige E-Mail-Adresse.)

   ```
   aws sns subscribe \
       --topic-arn arn:aws:sns:region:accountID:wooferTopic \
       --protocol email \
       --notification-endpoint example@example.com
   ```

1. Amazon SNS sendet eine Bestätigungsnachricht an Ihre E-Mail-Adresse. Klicken Sie auf den Link **Confirm subscription** (Abonnement bestätigen) in dieser E-Mail, um Ihr Abonnement abzuschließen.

## Schritt 4: Erstellen und Testen einer Lambda-Funktion
<a name="Streams.Lambda.Tutorial.LambdaFunction"></a>

In diesem Schritt erstellen Sie eine AWS Lambda Funktion (`publishNewBark`) zur Verarbeitung von `BarkTable` Stream-Datensätzen.

Die `publishNewBark`-Funktion verarbeitet nur die Stream-Ereignisse, die neuen Elementen in `BarkTable` entsprechen. Die Funktion liest Daten eines entsprechenden Ereignisses und ruft Amazon SNS auf, um es zu veröffentlichen.

1. Erstellen Sie eine Datei mit dem Namen `publishNewBark.js` und dem folgenden Inhalt. Ersetzen Sie `region` und `accountID` durch Ihre AWS Region und Konto-ID.

   ```
   'use strict';
   var AWS = require("aws-sdk");
   var sns = new AWS.SNS();
   
   exports.handler = (event, context, callback) => {
   
       event.Records.forEach((record) => {
           console.log('Stream record: ', JSON.stringify(record, null, 2));
   
           if (record.eventName == 'INSERT') {
               var who = JSON.stringify(record.dynamodb.NewImage.Username.S);
               var when = JSON.stringify(record.dynamodb.NewImage.Timestamp.S);
               var what = JSON.stringify(record.dynamodb.NewImage.Message.S);
               var params = {
                   Subject: 'A new bark from ' + who,
                   Message: 'Woofer user ' + who + ' barked the following at ' + when + ':\n\n ' + what,
                   TopicArn: 'arn:aws:sns:region:accountID:wooferTopic'
               };
               sns.publish(params, function(err, data) {
                   if (err) {
                       console.error("Unable to send message. Error JSON:", JSON.stringify(err, null, 2));
                   } else {
                       console.log("Results from sending message: ", JSON.stringify(data, null, 2));
                   }
               });
           }
       });
       callback(null, `Successfully processed ${event.Records.length} records.`);
   };
   ```

1. Erstellen Sie eine ZIP-Datei mit `publishNewBark.js`. Wenn Sie das ZIP-Befehlszeilen-Dienstprogramm verwenden, geben Sie dazu den folgenden Befehl ein.

   ```
   zip publishNewBark.zip publishNewBark.js
   ```

1. Beim Erstellen der Lambda-Funktion geben Sie den Amazon-Ressourcennamen (ARN) für `WooferLambdaRole` an, den Sie in [Schritt 2: Erstellen einer Lambda-Ausführungsrolle](#Streams.Lambda.Tutorial.CreateRole) erstellt haben. Geben Sie den folgenden Befehl ein, um diesen ARN abzurufen.

   ```
   aws iam get-role --role-name WooferLambdaRole
   ```

   Suchen Sie in der Ausgabe nach dem ARN für `WooferLambdaRole`.

   ```
   ...
   "Arn": "arn:aws:iam::region:role/service-role/WooferLambdaRole"
   ...
   ```

   Verwenden Sie den folgenden Befehl, um die Lambda-Funktion zu erstellen. *roleARN*Ersetzen Sie es durch den ARN für`WooferLambdaRole`.

   ```
   aws lambda create-function \
       --region region \
       --function-name publishNewBark \
       --zip-file fileb://publishNewBark.zip \
       --role roleARN \
       --handler publishNewBark.handler \
       --timeout 5 \
       --runtime nodejs16.x
   ```

1. Testen Sie die `publishNewBark`-Funktion. Stellen Sie dazu eine Eingabe bereit, die einem echten Datensatz aus DynamoDB Streams ähnelt.

   Erstellen Sie eine Datei mit dem Namen `payload.json` und dem folgenden Inhalt. Ersetzen Sie `region` und `accountID` durch Ihre AWS-Region und die Konto-ID:

   ```
   {
       "Records": [
           {
               "eventID": "7de3041dd709b024af6f29e4fa13d34c",
               "eventName": "INSERT",
               "eventVersion": "1.1",
               "eventSource": "aws:dynamodb",
               "awsRegion": "region",
               "dynamodb": {
                   "ApproximateCreationDateTime": 1479499740,
                   "Keys": {
                       "Timestamp": {
                           "S": "2016-11-18:12:09:36"
                       },
                       "Username": {
                           "S": "John Doe"
                       }
                   },
                   "NewImage": {
                       "Timestamp": {
                           "S": "2016-11-18:12:09:36"
                       },
                       "Message": {
                           "S": "This is a bark from the Woofer social network"
                       },
                       "Username": {
                           "S": "John Doe"
                       }
                   },
                   "SequenceNumber": "13021600000000001596893679",
                   "SizeBytes": 112,
                   "StreamViewType": "NEW_IMAGE"
               },
               "eventSourceARN": "arn:aws:dynamodb:region:account ID:table/BarkTable/stream/2016-11-16T20:42:48.104"
           }
       ]
   }
   ```

   Verwenden Sie den folgenden Befehl, um die `publishNewBark`-Funktion zu erstellen.

   ```
   aws lambda invoke --function-name publishNewBark --payload file://payload.json --cli-binary-format raw-in-base64-out output.txt
   ```

   Wenn der Test erfolgreich war, sehen Sie die folgende Ausgabe.

   ```
   {
       "StatusCode": 200,
       "ExecutedVersion": "$LATEST"
   }
   ```

   Darüber hinaus enthält die `output.txt`-Datei folgenden Text.

   ```
   "Successfully processed 1 records."
   ```

   Außerdem erhalten Sie in wenigen Minuten eine neue E-Mail-Nachricht.
**Anmerkung**  
AWS Lambda schreibt Diagnoseinformationen in Amazon CloudWatch Logs. Wenn Fehler mit Ihrer Lambda-Funktion auftreten, können Sie folgende Diagnoseverfahren zur Fehlerbehebung verwenden:  
Öffnen Sie die CloudWatch Konsole unter [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).
Wählen Sie im Navigationsbereich **Logs** (Logs) aus.
Wählen Sie die folgende Protokollgruppe aus: `/aws/lambda/publishNewBark`
Wählen Sie den aktuellen Protokoll-Stream aus, um die Ausgabe (und Fehler) der Funktion zu sehen.

## Schritt 5: Erstellen und Testen eines Auslösers
<a name="Streams.Lambda.Tutorial.CreateTrigger"></a>

In [Schritt 4: Erstellen und Testen einer Lambda-Funktion](#Streams.Lambda.Tutorial.LambdaFunction) haben Sie die Lambda-Funktion getestet, um sicherzustellen, dass sie korrekt ausgeführt wird. In diesem Schritt erstellen Sie einen *Auslöser*, indem Sie die Lambda-Funktion (`publishNewBark`) mit einer Ereignisquelle (dem `BarkTable`-Stream) verknüpfen.

1. Wenn Sie den Auslöser erstellen, müssen Sie den ARN für den `BarkTable`-Stream angeben. Geben Sie den folgenden Befehl ein, um diesen ARN abzurufen.

   ```
   aws dynamodb describe-table --table-name BarkTable
   ```

   Suchen Sie in der Ausgabe nach dem `LatestStreamArn`.

   ```
   ...
    "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp
   ...
   ```

1. Geben Sie den folgenden Befehl ein, um den Auslöser zu erstellen. Ersetzen Sie `streamARN` durch den tatsächlichen Stream-ARN.

   ```
   aws lambda create-event-source-mapping \
       --region region \
       --function-name publishNewBark \
       --event-source streamARN  \
       --batch-size 1 \
       --starting-position TRIM_HORIZON
   ```

1. Testen Sie den Auslöser. Geben Sie dazu den folgenden Befehl ein, um `BarkTable` ein Element hinzuzufügen.

   ```
   aws dynamodb put-item \
       --table-name BarkTable \
       --item Username={S="Jane Doe"},Timestamp={S="2016-11-18:14:32:17"},Message={S="Testing...1...2...3"}
   ```

   Sie erhalten in wenigen Minuten eine neue E-Mail-Nachricht.

1. Öffnen Sie die DynamoDB-Konsole und fügen Sie `BarkTable` weitere Elemente hinzu. Sie müssen Werte für die Attribute `Username` und `Timestamp` angeben. (Sie sollten auch einen Wert für `Message` angeben, auch wenn er nicht erforderlich ist.) Sie erhalten eine neue E-Mail-Nachricht für jedes Element, das Sie `BarkTable` hinzufügen.

   Die Lambda-Funktion verarbeitet nur neue Elemente, die Sie `BarkTable` hinzufügen. Wenn Sie ein Element in der Tabelle aktualisieren oder löschen, hat die Funktion keine Auswirkung.

**Anmerkung**  
AWS Lambda schreibt Diagnoseinformationen in Amazon CloudWatch Logs. Wenn Fehler mit Ihrer Lambda-Funktion auftreten, können Sie folgende Diagnoseverfahren zur Fehlerbehebung verwenden.  
Öffnen Sie die CloudWatch Konsole unter [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).
Wählen Sie im Navigationsbereich **Logs** (Logs) aus.
Wählen Sie die folgende Protokollgruppe aus: `/aws/lambda/publishNewBark`
Wählen Sie den aktuellen Protokoll-Stream aus, um die Ausgabe (und Fehler) der Funktion zu sehen.

# Tutorial 2: Verwenden von Filtern, um einige Ereignisse mit DynamoDB und Lambda zu verarbeiten
<a name="Streams.Lambda.Tutorial2"></a>

In diesem Tutorial erstellen Sie einen AWS Lambda Trigger, um nur einige Ereignisse in einem Stream aus einer DynamoDB-Tabelle zu verarbeiten.

**Topics**
+ [

## Alles zusammenfügen - CloudFormation
](#Streams.Lambda.Tutorial2.Cloudformation)
+ [

## Zusammenführung – CDK
](#Streams.Lambda.Tutorial2.CDK)

Über die [Lambda-Ereignisfilterung](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html) können Sie mithilfe von Filterausdrücken steuern, welche Ereignisse Lambda zur Verarbeitung an Ihre Funktion sendet. Sie können bis zu 5 verschiedene Filter pro DynamoDB-Streams konfigurieren. Wenn Sie Batching-Fenster verwenden, wendet Lambda die Filterkriterien auf jedes neue Ereignis an, um festzustellen, ob es dem aktuellen Batch hinzugefügt werden soll.

Filter werden über Strukturen, sogenannte `FilterCriteria`, angewendet. Die 3 Hauptattribute von `FilterCriteria` sind `metadata properties`, `data properties` und `filter patterns`. 

Hier ist eine Beispielstruktur eines DynamoDB-Streams-Ereignisses:

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

`metadata properties` sind die Felder des Ereignisobjekts. Im Falle von DynamoDB-Streams sind `metadata properties` Felder wie `dynamodb` oder `eventName`. 

`data properties` sind die Felder des Ereignistexts. Um nach `data properties` zu filtern, müssen sie in `FilterCriteria` im richtigen Schlüssel eingeschlossen sein. Für DynamoDB-Ereignisquellen lautet der Datenschlüssel `NewImage` oder `OldImage`.

Schließlich definieren Filterregeln den Filterausdruck, den Sie auf eine bestimmte Eigenschaft anwenden möchten. Hier sind einige Beispiele:


| Vergleichsoperator | Beispiel | Regelsyntax (partiell) | 
| --- | --- | --- | 
|  Null  |  Der Produkttyp ist null.  |  `{ "product_type": { "S": null } } `  | 
|  Leer  |  Der Produktname ist leer.  |  `{ "product_name": { "S": [ ""] } } `  | 
|  Gleich  |  Der Bundesstaat ist Florida.  |  `{ "state": { "S": ["FL"] } } `  | 
|  Bedingung 2  |  Der Produktstatus ist Florida und die Produktkategorie „Chocolate“ (Schokolade).  |  `{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } } `  | 
|  Oder  |  Der Produktstatus ist Florida oder Kalifornien.  |  `{ "state": { "S": ["FL","CA"] } } `  | 
|  Nicht  |  Der Produktstatus ist nicht Florida.  |  `{"state": {"S": [{"anything-but": ["FL"]}]}}`  | 
|  Vorhanden  |  Produkt „Homemade“ (Hausgemacht) ist vorhanden.  |  `{"homemade": {"S": [{"exists": true}]}}`  | 
|  Nicht vorhanden  |  Produkt „Homemade“ (Hausgemacht) ist nicht vorhanden.  |  `{"homemade": {"S": [{"exists": false}]}}`  | 
|  Beginnt mit  |  PK beginnt mit COMPANY (Unternehmen).  |  `{"PK": {"S": [{"prefix": "COMPANY"}]}}`  | 

Sie können bis zu 5 Ereignisfilterungsmuster für eine Lambda-Funktion angeben. Beachten Sie, dass jedes dieser 5 Ereignisse als logisches ODER ausgewertet wird. Wenn Sie also zwei Filter namens `Filter_One` und `Filter_Two` konfigurieren, führt die Lambda-Funktion `Filter_One` ODER `Filter_Two` aus.

**Anmerkung**  
Auf der Seite [Lambda-Ereignisfilterung](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html) sind einige Optionen zum Filtern und Vergleichen numerischer Werte vorhanden. Im Falle von DynamoDB-Filterereignissen gelten diese jedoch nicht, da Zahlen in DynamoDB als Zeichenfolgen gespeichert werden. Bei ` "quantity": { "N": "50" }` beispielsweise wissen wir aufgrund der Eigenschaft `"N"`, dass es sich um eine Zahl handelt.

## Alles zusammenfügen - CloudFormation
<a name="Streams.Lambda.Tutorial2.Cloudformation"></a>

Um die Funktionalität der Ereignisfilterung in der Praxis zu demonstrieren, finden Sie hier eine CloudFormation Beispielvorlage. Diese Vorlage generiert eine einfache DynamoDB-Tabelle mit einem Partitionsschlüssel PK und einem Sortierschlüssel SK mit aktiviertem Amazon DynamoDB Streams. Sie erstellt eine Lambda-Funktion und eine einfache Lambda-Ausführungsrolle, die das Schreiben von Protokollen in Amazon Cloudwatch und das Lesen der Ereignisse aus dem Amazon-DynamoDB-Stream ermöglichen. Zudem fügt sie die Ereignisquellenzuordnung zwischen den DynamoDB-Streams und der Lambda-Funktion hinzu, sodass die Funktion bei jedem Ereignis im Amazon-DymamoDB-Stream ausgeführt werden kann.

```
AWSTemplateFormatVersion: "2010-09-09"

Description: Sample application that presents AWS Lambda event source filtering 
with Amazon DynamoDB Streams.

Resources:
  StreamsSampleDDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: "PK"
          AttributeType: "S"
        - AttributeName: "SK"
          AttributeType: "S"
      KeySchema:
        - AttributeName: "PK"
          KeyType: "HASH"
        - AttributeName: "SK"
          KeyType: "RANGE"
      StreamSpecification:
        StreamViewType: "NEW_AND_OLD_IMAGES"
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 5

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17",		 	 	 
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: "/"
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: "2012-10-17",		 	 	 
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: arn:aws:logs:*:*:*
              - Effect: Allow
                Action:
                  - dynamodb:DescribeStream
                  - dynamodb:GetRecords
                  - dynamodb:GetShardIterator
                  - dynamodb:ListStreams
                Resource: !GetAtt StreamsSampleDDBTable.StreamArn

  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST

  ProcessEventLambda:
    Type: AWS::Lambda::Function
    Properties:
      Runtime: python3.7
      Timeout: 300
      Handler: index.handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Code:
        ZipFile: |
          import logging

          LOGGER = logging.getLogger()
          LOGGER.setLevel(logging.INFO)

          def handler(event, context):
            LOGGER.info('Received Event: %s', event)
            for rec in event['Records']:
              LOGGER.info('Record: %s', rec)

Outputs:
  StreamsSampleDDBTable:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.Arn
  StreamARN:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.StreamArn
```

Nachdem Sie diese Cloud-Formation-Vorlage bereitgestellt haben, können Sie das folgende Amazon-DynamoDB-Element einfügen:

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

Dank der einfachen Lambda-Funktion, die direkt in dieser Cloud-Formation-Vorlage enthalten ist, sehen Sie die Ereignisse in den CloudWatch Amazon-Protokollgruppen für die Lambda-Funktion wie folgt:

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

**Filterbeispiele**
+ **Nur Produkte, die einem bestimmten Bundesstaat entsprechen**

In diesem Beispiel wird die CloudFormation Vorlage dahingehend geändert, dass sie einen Filter enthält, der allen Produkten aus Florida mit der Abkürzung „FL“ entspricht.

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Wenn Sie den Stack erneut bereitstellen, können Sie das folgende DynamoDB-Element zur Tabelle hinzufügen. Beachten Sie, dass es nicht in den Lambda-Funktionsprotokollen angezeigt wird, da das Produkt in diesem Beispiel aus Kalifornien stammt.

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK#1000",
 "company_id": "1000",
 "fabric": "Florida Chocolates",
 "price": 15,
 "product_id": "1000",
 "quantity": 50,
 "state": "CA",
 "stores": 5,
 "type": ""
}
```
+ **Nur die Elemente, die mit einigen Werten im PK und SK beginnen**

In diesem Beispiel wird die CloudFormation Vorlage so geändert, dass sie die folgende Bedingung enthält:

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Beachten Sie Folgendes: Die AND-Bedingung erfordert, dass sich die Bedingung innerhalb des Musters befindet, wobei sich die Schlüssel PK und SK in demselben Ausdruck, durch ein Komma getrennt, befinden.

Entweder mit einigen Werten für PK und SK beginnen oder stammt aus bestimmten Bundesstaat.

In diesem Beispiel wird die CloudFormation Vorlage so geändert, dass sie die folgenden Bedingungen enthält:

```
  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Beachten Sie, dass die OR-Bedingung durch Einführung neuer Muster in den Filterabschnitt hinzugefügt wird.

## Zusammenführung – CDK
<a name="Streams.Lambda.Tutorial2.CDK"></a>

In der folgenden Beispielvorlage für die CDK-Projektbildung werden die Funktionen zum Filtern von Ereignissen veranschaulicht. Bevor Sie mit diesem CDK-Projekt arbeiten können, müssen Sie [die Voraussetzungen installieren](https://docs.aws.amazon.com/cdk/v2/guide/work-with.html). Dies beinhaltet auch die [Ausführung von Vorbereitungsskripten](https://docs.aws.amazon.com/cdk/v2/guide/work-with-cdk-python.html).

**Erstellen eines CDK-Projekts**

Erstellen Sie zunächst ein neues AWS CDK Projekt, indem Sie es `cdk init` in einem leeren Verzeichnis aufrufen.

```
mkdir ddb_filters
cd ddb_filters
cdk init app --language python
```

Im Befehl `cdk init` wird der Name des Projektordners zur Benennung verschiedener Elemente des Projekts verwendet, einschließlich Klassen, Unterordnern und Dateien. Bindestriche im Ordnernamen werden in Unterstriche umgewandelt. Ansonsten sollte der Name dem Format eines Python-Bezeichners folgen. Er sollte beispielsweise nicht mit einer Zahl beginnen und keine Leerzeichen enthalten.

Um mit dem neuen Projekt zu arbeiten, aktivieren Sie seine virtuelle Umgebung. Dadurch können die Abhängigkeiten des Projekts lokal im Projektordner installiert werden und müssen nicht global installiert werden.

```
source .venv/bin/activate
python -m pip install -r requirements.txt
```

**Anmerkung**  
Möglicherweise erkennen Sie dies als den Mac/Linux Befehl zur Aktivierung einer virtuellen Umgebung. Die Python-Vorlagen enthalten eine Batch-Datei, `source.bat`, die die Verwendung desselben Befehls unter Windows ermöglicht. Der traditionelle Windows-Befehl `.venv\Scripts\activate.bat` funktioniert ebenfalls. Wenn Sie Ihr AWS CDK Projekt mit AWS CDK Toolkit v1.70.0 oder früher initialisiert haben, befindet sich Ihre virtuelle Umgebung im `.env` Verzeichnis statt. `.venv` 

**Grundlegende Infrastruktur**

Öffnen Sie die Datei `./ddb_filters/ddb_filters_stack.py` in einem Texteditor Ihrer Wahl. Diese Datei wurde auto generiert, als Sie das AWS CDK Projekt erstellt haben. 

Fügen Sie als Nächstes die Funktionen `_create_ddb_table` und `_set_ddb_trigger_function` hinzu. Diese Funktionen erstellen eine DynamoDB-Tabelle mit dem Partitionsschlüssel PK und dem Sortierschlüssel SK im Bereitstellungsmodus/On-Demand-Modus, wobei Amazon DynamoDB Streams standardmäßig aktiviert ist, um neue und alte Bilder anzuzeigen.

Die Lambda-Funktion wird im Ordner `lambda` unter der Datei `app.py` gespeichert. Diese Datei wird später erstellt. Sie wird eine Umgebungsvariable, `APP_TABLE_NAME`, enthalten. Hierbei wird es sich um den Namen der Amazon-DynamoDB-Tabelle handeln, die von diesem Stack erstellt wurde. In derselben Funktion werden wir der Lambda-Funktion Stream-Leseberechtigungen erteilen. Schließlich wird sie die DynamoDB Streams als Ereignisquelle für die Lambda-Funktion abonnieren. 

Am Ende der Datei, in der `__init__`-Methode, werden Sie die entsprechenden Konstrukte aufrufen, um sie im Stack zu initialisieren. Bei größeren Projekten, die zusätzliche Komponenten und Services erfordern, ist es möglicherweise am besten, diese Konstrukte außerhalb des Basis-Stacks zu definieren. 

```
import os
import json

import aws_cdk as cdk
from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_dynamodb as dynamodb,
)
from constructs import Construct


class DdbFiltersStack(Stack):

    def _create_ddb_table(self):
        dynamodb_table = dynamodb.Table(
            self,
            "AppTable",
            partition_key=dynamodb.Attribute(
                name="PK", type=dynamodb.AttributeType.STRING
            ),
            sort_key=dynamodb.Attribute(
                name="SK", type=dynamodb.AttributeType.STRING),
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
            stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
            removal_policy=cdk.RemovalPolicy.DESTROY,
        )

        cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name)
        return dynamodb_table

    def _set_ddb_trigger_function(self, ddb_table):
        events_lambda = _lambda.Function(
            self,
            "LambdaHandler",
            runtime=_lambda.Runtime.PYTHON_3_9,
            code=_lambda.Code.from_asset("lambda"),
            handler="app.handler",
            environment={
                "APP_TABLE_NAME": ddb_table.table_name,
            },
        )

        ddb_table.grant_stream_read(events_lambda)

        event_subscription = _lambda.CfnEventSourceMapping(
            scope=self,
            id="companyInsertsOnlyEventSourceMapping",
            function_name=events_lambda.function_name,
            event_source_arn=ddb_table.table_stream_arn,
            maximum_batching_window_in_seconds=1,
            starting_position="LATEST",
            batch_size=1,
        )

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        ddb_table = self._create_ddb_table()
        self._set_ddb_trigger_function(ddb_table)
```

Jetzt werden wir eine sehr einfache Lambda-Funktion erstellen, die die Protokolle in Amazon CloudWatch druckt. Erstellen Sie zu diesem Zweck einen neuen Ordner namens `lambda`.

```
mkdir lambda
touch app.py
```

Fügen Sie der Datei `app.py` über Ihren bevorzugten Texteditor den folgenden Inhalt hinzu:

```
import logging

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)


def handler(event, context):
    LOGGER.info('Received Event: %s', event)
    for rec in event['Records']:
        LOGGER.info('Record: %s', rec)
```

Stellen Sie sicher, dass Sie sich im Ordner `/ddb_filters/` befinden, und geben Sie zum Erstellen der Beispielanwendung den folgenden Befehl ein:

```
cdk deploy
```

Irgendwann werden Sie aufgefordert, zu bestätigen, dass Sie die Lösung bereitstellen möchten. Akzeptieren Sie die Änderungen durch Eingabe von `Y`.

```
├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤
│ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │
└───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘

Do you wish to deploy these changes (y/n)? y

...

✨  Deployment time: 67.73s

Outputs:
DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP
Stack ARN:
arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6
```

Sobald die Änderungen bereitgestellt sind, öffnen Sie Ihre AWS Konsole und fügen Sie Ihrer Tabelle ein Element hinzu. 

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

Die CloudWatch Protokolle sollten jetzt alle Informationen aus diesem Eintrag enthalten. 

**Filterbeispiele**
+ **Nur Produkte, die einem bestimmten Bundesstaat entsprechen**

Öffnen Sie die Datei `ddb_filters/ddb_filters/ddb_filters_stack.py` und ändern Sie sie so, dass sie den Filter enthält, der alle Produkte abgleicht, die gleich „FL“ sind. Dies kann direkt unter der `event_subscription` in Zeile 45 geändert werden.

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```
+ **Nur die Elemente, die mit einigen Werten im PK und SK beginnen**

Ändern Sie das Python-Skript, um die folgende Bedingung aufzunehmen:

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
        ]
    },
```
+ **Entweder mit einigen Werten für PK und SK beginnen oder stammt aus bestimmten Bundesstaat.**

Ändern Sie das Python-Skript, um die folgenden Bedingungen aufzunehmen:

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```

Beachten Sie, dass die OR-Bedingung durch Hinzufügen weiterer Elemente zum Filter-Array hinzugefügt wird.

**Bereinigen**

Suchen Sie den Filter-Stack in der Basis Ihres Arbeitsverzeichnisses und führen Sie `cdk destroy` aus. Sie werden aufgefordert, das Löschen der Ressource zu bestätigen:

```
cdk destroy
Are you sure you want to delete: DdbFiltersStack (y/n)? y
```

# Bewährte Methoden zur Verwendung von DynamoDB Streams mit Lambda
<a name="Streams.Lambda.BestPracticesWithDynamoDB"></a>

Eine AWS Lambda Funktion wird in einem *Container* ausgeführt — einer Ausführungsumgebung, die von anderen Funktionen isoliert ist. Wenn Sie eine Funktion zum ersten Mal ausführen, AWS Lambda wird ein neuer Container erstellt und mit der Ausführung des Codes der Funktion begonnen.

Eine Lambda-Funktion hat einen *Handler*, der einmal pro Aufruf ausgeführt wird. Der Handler enthält die Hauptgeschäftslogik für die Funktion. Die Lambda-Funktion in [Schritt 4: Erstellen und Testen einer Lambda-Funktion](Streams.Lambda.Tutorial.md#Streams.Lambda.Tutorial.LambdaFunction) verfügt z. B. über einen Handler, der Datensätze in einem DynamoDB-Stream verarbeiten kann. 

Sie können auch Initialisierungscode bereitstellen, der nur einmal ausgeführt wird — nachdem der Container erstellt wurde, aber bevor der Handler zum ersten Mal AWS Lambda ausgeführt wird. Die unter gezeigte Lambda-Funktion [Schritt 4: Erstellen und Testen einer Lambda-Funktion](Streams.Lambda.Tutorial.md#Streams.Lambda.Tutorial.LambdaFunction) hat einen Initialisierungscode, der das SDK für JavaScript in Node.js importiert und einen Client für Amazon SNS erstellt. Diese Objekte sollten nur einmal außerhalb des Handlers definiert werden.

Nachdem die Funktion ausgeführt wurde, AWS Lambda können Sie sich dafür entscheiden, den Container für nachfolgende Aufrufe der Funktion wiederzuverwenden. In diesem Fall kann Ihr Funktions-Handler die Ressourcen, die Sie in Ihrem Initialisierungscode definiert haben, erneut nutzen. (Sie können nicht steuern, wie lange AWS Lambda den Container beibehält oder ob der Container überhaupt wiederverwendet wird.)

Für die Verwendung AWS Lambda von DynamoDB-Triggern empfehlen wir Folgendes:
+ AWS Service-Clients sollten im Initialisierungscode instanziiert werden, nicht im Handler. Dies ermöglicht AWS Lambda die Wiederverwendung vorhandener Verbindungen für die Dauer der Lebensdauer des Containers.
+ Im Allgemeinen müssen Sie Verbindungen nicht explizit verwalten oder Verbindungspooling implementieren, da AWS Lambda dies für Sie erledigt wird.

Ein Lambda-Verbraucher für einen DynamoDB-Stream garantiert nicht genau eine Lieferung und kann gelegentlich zu Duplikaten führen. Stellen Sie sicher, dass Ihr Lambda-Funktionscode idempotent ist, um zu verhindern, dass unerwartete Probleme aufgrund doppelter Verarbeitung auftreten.

Weitere Informationen finden Sie im *AWS Lambda Entwicklerhandbuch* unter [Bewährte Methoden für die Arbeit mit AWS Lambda Funktionen](https://docs.aws.amazon.com/lambda/latest/dg/best-practices.html).

# DynamoDB Streams und Apache Flink
<a name="StreamsApacheFlink.xml"></a>

Sie können Amazon-DynamoDB-Streams-Datensätze mit Apache Flink aufnehmen. Mit [Amazon Managed Service für Apache Flink](https://aws.amazon.com/managed-service-apache-flink/) lassen sich Streaming-Daten mithilfe von Apache Flink in Echtzeit umwandeln und analysieren. Apache Flink ist ein Open-Source-Verarbeitungsframework für Streams, mit dem Echtzeitdaten verarbeitet werden können. Der Amazon-DynamoDB-Streams-Connector für Apache Flink vereinfacht die Erstellung und Verwaltung von Apache Flink-Workloads. Zudem ermöglicht er die Integration von Anwendungen in andere AWS-Services.

Amazon Managed Service für Apache Flink hilft Ihnen dabei, schnell end-to-end Stream-Verarbeitungsanwendungen für Protokollanalysen, Clickstream-Analysen, Internet der Dinge (IoT), Werbetechnologie, Spiele und mehr zu erstellen. Die vier häufigsten Anwendungsfälle sind Streaming extract-transform-load (ETL), ereignisgesteuerte Anwendungen, reaktionsschnelle Echtzeitanalysen und interaktive Abfragen von Datenströmen. Weitere Informationen dazu, wie Sie aus Amazon DynamoDB Streams in Apache Flink schreiben, finden Sie unter [Amazon DynamoDB Streams Connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/dynamodb/).