

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Verwendung AWS Lambda mit Amazon DynamoDB
<a name="with-ddb"></a>

**Anmerkung**  
Wenn Sie Daten an ein anderes Ziel als eine Lambda-Funktion senden oder die Daten vor dem Senden anreichern möchten, finden Sie weitere Informationen unter [Amazon EventBridge Pipes](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html).

Sie können eine AWS Lambda Funktion verwenden, um Datensätze in einem [Amazon DynamoDB DynamoDB-Stream](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) zu verarbeiten. Mit DynamoDB Streams können Sie eine Lambda-Funktion auslösen, damit jedes Mal, wenn eine DynamoDB-Tabelle aktualisiert wird, weitere Aufgaben ausgeführt werden.

Bei der Verarbeitung von DynamoDB-Streams müssen Sie eine Logik für teilweise Batch-Antworten implementieren, um zu verhindern, dass versucht wird, erfolgreich verarbeitete Datensätze erneut zu verarbeiten, wenn einige Datensätze in einem Batch fehlschlagen. Das [Batch Processor Utility](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/) von Powertools for AWS Lambda ist in Python TypeScript, .NET und Java verfügbar und vereinfacht diese Implementierung, indem es automatisch die partielle Batch-Antwortlogik verarbeitet, wodurch die Entwicklungszeit reduziert und die Zuverlässigkeit verbessert wird.

**Topics**
+ [Abrufen und Stapeln von Streams](#dynamodb-polling-and-batching)
+ [Startpositionen für Abfragen und Streams](#dyanmo-db-stream-poll)
+ [Gleichzeitige Leser eines Shard in DynamoDB Streams](#events-dynamodb-simultaneous-readers)
+ [Beispielereignis](#events-sample-dynamodb)
+ [DynamoDB-Datensätze mit Lambda verarbeiten](services-dynamodb-eventsourcemapping.md)
+ [Konfigurieren einer teilweisen Batch-Antwort mit DynamoDB und Lambda](services-ddb-batchfailurereporting.md)
+ [Verworfene Datensätze für eine DynamoDB-Ereignisquelle in Lambda aufbewahren](services-dynamodb-errors.md)
+ [Implementierung der statusbehafteten DynamoDB-Stream-Verarbeitung in Lambda](services-ddb-windows.md)
+ [Lambdaparameter für Amazon DynamoDB-SQS-Zuordnungen von Ereignisquellen](services-ddb-params.md)
+ [Verwendung der Ereignisfilterung mit einer DynamoDB-Ereignisquelle](with-ddb-filtering.md)
+ [Tutorial: Verwendung AWS Lambda mit Amazon DynamoDB DynamoDB-Streams](with-ddb-example.md)

## Abrufen und Stapeln von Streams
<a name="dynamodb-polling-and-batching"></a>

Lambda fragt Shards in Ihrem DynamoDB-Stream nach Datensätzen bei einer Basisrate von viermal pro Sekunde ab. Sind Datensätze verfügbar, ruft Lambda Ihre Funktion auf und wartet auf das Ergebnis. Ist die Verarbeitung erfolgreich, setzt Lambda die Abrufe fort, bis es weitere Datensätze erhält.

Standardmäßig ruft Lambda Ihre Funktion auf, sobald Datensätze verfügbar sind. Wenn der Batch, den Lambda aus der Ereignisquelle liest, nur einen Datensatz enthält, sendet Lambda nur einen Datensatz an die Funktion. Damit die Funktion nicht mit einer kleinen Anzahl von Datensätzen aufgerufen wird, können Sie die Ereignisquelle anweisen, Datensätze bis zu 5 Minuten lang zu puffern, indem Sie ein *Batch-Fenster* konfigurieren. Bevor die Funktion aufgerufen wird, liest Lambda so lange Datensätze aus der Ereignisquelle, bis es einen vollständigen Batch erfasst hat, das Batch-Verarbeitungsfenster abläuft oder der Batch die Nutzlastgrenze von 6 MB erreicht. Weitere Informationen finden Sie unter [Batching-Verhalten](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching).

**Warnung**  
Zuordnung von Lambda-Ereignisquellen verarbeiten jedes Ereignis mindestens einmal und es kann zu einer doppelten Verarbeitung von Datensätzen kommen. Um mögliche Probleme im Zusammenhang mit doppelten Ereignissen zu vermeiden, empfehlen wir Ihnen dringend, Ihren Funktionscode idempotent zu machen. Weitere Informationen finden Sie im Knowledge Center unter [Wie mache ich meine Lambda-Funktion idempotent](https://repost.aws/knowledge-center/lambda-function-idempotent)?. AWS 

Lambda wartet mit dem Senden des nächsten zu verarbeitenden Stapels nicht, bis ggf. konfigurierte [Erweiterungen](lambda-extensions.md) abgeschlossen sind. Anders ausgedrückt: Ihre Erweiterungen werden möglicherweise weiter ausgeführt, während Lambda den nächsten Stapel von Datensätzen verarbeitet. Dies kann zu Drosselungsproblemen führen, wenn Sie gegen eine Einstellung oder gegen einen Grenzwert im Zusammenhang mit der [Parallelität](lambda-concurrency.md) Ihres Kontos verstoßen. Um zu erkennen, ob möglicherweise ein Problem vorliegt, müssen Sie Ihre Funktionen überwachen sowie überprüfen, ob für Ihre Zuordnung von Ereignisquellen unerwartet hohe [Parallelitätsmetriken](monitoring-concurrency.md#general-concurrency-metrics) vorliegen. Aufgrund der kurzen Zeit zwischen den Aufrufen kann Lambda kurzzeitig eine höhere Gleichzeitigkeitsnutzung als die Anzahl der Shards melden. Dies kann sogar für Lambda-Funktionen ohne Erweiterungen gelten.

Konfigurieren Sie die [ ParallelizationFactor](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-ParallelizationFactor)Einstellung so, dass ein Shard eines DynamoDB-Streams mit mehr als einem Lambda-Aufruf gleichzeitig verarbeitet wird. Sie können die Anzahl der gleichzeitigen Batches angeben, die Lambda von einem Shard über einen Parallelisierungsfaktor von 1 (Standard) bis 10 abfragt. Wenn `ParallelizationFactor` beispielsweise auf 2 gesetzt ist, können Sie maximal 200 gleichzeitige Lambda-Aufrufe haben, um 100 DynamoDB-Stream-Shards zu verarbeiten (in der Praxis werden womöglich andere Werte für die Metrik `ConcurrentExecutions` angezeigt). Dies hilft, den Verarbeitungsdurchsatz hochzuskalieren, wenn das Datenvolumen flüchtig ist und [IteratorAge](monitoring-metrics-types.md#performance-metrics) hoch ist. Wenn Sie die Anzahl der gleichzeitigen Stapel pro Shard erhöhen, sorgt Lambda weiterhin für eine ordnungsgemäße Verarbeitung auf der Ebene der Elemente (Partition und Sortierschlüssel).

## Startpositionen für Abfragen und Streams
<a name="dyanmo-db-stream-poll"></a>

Beachten Sie, dass die Stream-Abfrage bei der Erstellung und Aktualisierung der Zuordnung von Ereignisquellen letztendlich konsistent ist.
+ Bei der Erstellung der Zuordnung von Ereignisquellen kann es mehrere Minuten dauern, bis mit der Abfrage von Ereignissen aus dem Stream begonnen wird.
+ Bei Aktualisierungen der Zuordnung von Ereignisquellen kann es mehrere Minuten dauern, bis die Abfrage von Ereignissen aus dem Stream gestoppt und neu gestartet wird.

Dieses Verhalten bedeutet, dass, wenn Sie `LATEST` als Startposition für den Stream angeben, die Zuordnung von Ereignisquellen bei der Erstellung oder Aktualisierung möglicherweise Ereignisse übersieht. Um sicherzustellen, dass keine Ereignisse übersehen werden, geben Sie die Stream-Startposition als `TRIM_HORIZON` an.

## Gleichzeitige Leser eines Shard in DynamoDB Streams
<a name="events-dynamodb-simultaneous-readers"></a>

Für Einzelregionstabellen, bei denen es sich nicht um globale Tabellen handelt, können Sie bis zu zwei Lambda-Funktionen entwerfen, die gleichzeitig aus demselben DynamoDB-Streams-Shard lesen. Eine Überschreitung dieses Grenzwerts kann zu einer Anforderungsdrosselung führen. Für globale Tabellen empfehlen wir, die Anzahl der gleichzeitigen Funktionen auf eine zu beschränken, um eine Anforderungsdrosselung zu vermeiden.

## Beispielereignis
<a name="events-sample-dynamodb"></a>

**Example**  

```
{
  "Records": [
    {
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "NewImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES",
        "SequenceNumber": "111",
        "SizeBytes": 26
      },
      "awsRegion": "us-west-2",
      "eventName": "INSERT",
      "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525",
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "2",
      "eventVersion": "1.0",
      "dynamodb": {
        "OldImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "SequenceNumber": "222",
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "SizeBytes": 59,
        "NewImage": {
          "Message": {
            "S": "This item has changed"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "awsRegion": "us-west-2",
      "eventName": "MODIFY",
      "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525",
      "eventSource": "aws:dynamodb"
    }
  ]}
```

# DynamoDB-Datensätze mit Lambda verarbeiten
<a name="services-dynamodb-eventsourcemapping"></a>

Erstellen Sie eine Ereignisquellenzuordnung, um Lambda anzuweisen, Datensätze aus Ihrem Stream an eine Lambda-Funktion zu senden. Sie können mehrere Ereignisquellen-Zuweisung erstellen, um gleiche Daten mit mehreren Lambda-Funktionen oder Elemente aus mehreren Streams mit nur einer Funktion zu verarbeiten.

Sie können Zuordnungen von Ereignisquellen konfigurieren, um Datensätze aus einem Stream in einem anderen AWS-Konto zu verarbeiten. Weitere Informationen hierzu finden Sie unter [Erstellen einer kontoübergreifenden Zuordnung von Ereignisquellen](#services-dynamodb-eventsourcemapping-cross-account).

**Um Ihre Funktion so zu konfigurieren, dass sie aus DynamoDB Streams liest, fügen Sie die AWS verwaltete [AWSLambdaDBExecutionDynamo-Rollenrichtlinie an Ihre Ausführungsrolle](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaDynamoDBExecutionRole.html) an und erstellen Sie dann einen DynamoDB-Trigger.**

**So fügen Sie Berechtigungen hinzu und erstellen einen Auslöser**

1. Öffnen Sie die Seite [Funktionen](https://console.aws.amazon.com/lambda/home#/functions) der Lambda-Konsole.

1. Wählen Sie den Namen einer Funktion aus.

1. Wählen Sie die Registerkarte **Konfiguration** und dann **Berechtigungen** aus.

1. Wählen Sie unter **Rollenname** den Link zu Ihrer Ausführungsrolle. Dieser Link öffnet die Rolle in der IAM-Konsole.  
![\[\]](http://docs.aws.amazon.com/de_de/lambda/latest/dg/images/execution-role.png)

1. Wählen Sie **Berechtigungen hinzufügen** aus und wählen Sie dann **Richtlinien direkt anhängen** aus.  
![\[\]](http://docs.aws.amazon.com/de_de/lambda/latest/dg/images/attach-policies.png)

1. Geben Sie im Suchfeld `AWSLambdaDynamoDBExecutionRole` ein. Fügen Sie diese Richtlinie zu Ihrer Ausführungsrolle hinzu. Dies ist eine AWS verwaltete Richtlinie, die die Berechtigungen enthält, die Ihre Funktion zum Lesen aus dem DynamoDB-Stream benötigt. Weitere Informationen zu dieser Richtlinie finden Sie unter [AWSLambdaDBExecutionDynamo-Rolle](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaDynamoDBExecutionRole.html) in der Referenz für *AWS verwaltete* Richtlinien.

1. Gehen Sie zurück zu Ihrer Funktion in der Lambda-Konsole. Wählen Sie unter **Function overview (Funktionsübersicht)** die Option **Add trigger (Trigger hinzufügen)**.  
![\[\]](http://docs.aws.amazon.com/de_de/lambda/latest/dg/images/add-trigger.png)

1. Wählen Sie einen Auslösertyp aus.

1. Konfigurieren Sie die erforderlichen Optionen und wählen Sie dann **Add** (Hinzufügen) aus.

Lambda unterstützt die folgenden Optionen für DynamoDB-Ereignisquellen:

**Optionen für die Ereignisquelle**
+ **DynamoDB-Tabelle** – Die DynamoDB-Tabelle, aus der Datensätze gelesen werden sollen.
+ **Batchgröße** – Die Anzahl der Datensätze, die in jedem Batch bis zu 10.000 an die Funktion gesendet werden sollen. Lambda übergibt alle Datensätze im Batch in einem einzigen Aufruf an die Funktion, solange die Gesamtgröße der Ereignisse nicht das [Nutzlast-Limit](gettingstarted-limits.md) für synchrone Aufrufe überschreitet (6 MB).
+ **Batchfenster** – Geben Sie die maximale Zeitspanne zur Erfassung von Datensätzen vor dem Aufruf der Funktion in Sekunden an.
+ **Startposition** – Verarbeiten Sie nur neue Datensätze oder alle vorhandenen Datensätze.
  + **Neueste** – Verarbeiten Sie Datensätze, die neu zum Stream hinzugefügt wurden.
  + **Horizont trimmen** – Verarbeiten Sie alle Datensätze im Stream.

  Nach der Verarbeitung aller vorhandenen Datensätze hat die Funktion aufgeholt und setzt die Verarbeitung neuer Datensätze fort.
+ **On-failure destination (Ziel bei Ausfall)** – Eine SQS-Warteschlange oder ein SNS-Thema für Datensätze, die nicht verarbeitet werden können. Wenn Lambda einen Datensatz-Batch verwirft, weil er zu alt ist oder alle Wiederholungen erschöpft hat, sendet es Details zum Batch an die Warteschlange oder das Thema.
+ **Wiederholungsversuche** – Die maximale Anzahl von Wiederholungen von Lambda, wenn die Funktion einen Fehler zurückgibt. Dies gilt nicht für Servicefehler oder Drosselungen, bei denen der Batch die Funktion nicht erreicht hat.
+ **Höchstalter des Datensatzes** – Das maximale Alter eines Datensatzes, den Lambda an Ihre Funktion sendet.
+ **Batch bei Fehler aufteilen** – Wenn die Funktion einen Fehler zurückgibt, teilen Sie den Batch vor dem erneuten Versuch in zwei Teile. Ihre ursprüngliche Einstellung für die Batch-Größe bleibt unverändert.
+ **Gleichzeitige Batches pro Shard** – Verarbeitet gleichzeitig mehrere Batches aus demselben Shard.
+ **Aktiviert** – Auf „true“ festlegen, um die Ereignisquellenzuordnung zu aktivieren. Auf "false" festlegen, um die Verarbeitung von Datensätzen zu beenden. Lambda merkt sich den zuletzt verarbeiteten Datensatz und setzt die Verarbeitung nach erneuter Aktivierung der Zuordnung an dieser Stelle fort.

**Anmerkung**  
Für GetRecords API-Aufrufe, die von Lambda als Teil von DynamoDB-Triggern aufgerufen werden, fallen keine Gebühren an.

Um die Konfiguration der Ereignisquelle zu einem späteren Zeitpunkt zu verwalten, wählen Sie den Auslöser im Designer aus.

## Erstellen einer kontoübergreifenden Zuordnung von Ereignisquellen
<a name="services-dynamodb-eventsourcemapping-cross-account"></a>

Amazon DynamoDB unterstützt jetzt [ressourcenbasierte](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/access-control-resource-based.html) Richtlinien. Mit dieser Funktion können Sie Daten aus einem DynamoDB-Stream in einem AWS-Konto mit einer Lambda-Funktion in einem anderen Konto verarbeiten.

Um eine Ereignisquellenzuordnung für Ihre Lambda-Funktion mithilfe eines DynamoDB-Streams in einem anderen zu erstellen AWS-Konto, müssen Sie den Stream mithilfe einer ressourcenbasierten Richtlinie konfigurieren, um Ihrer Lambda-Funktion die Berechtigung zum Lesen von Datensätzen zu erteilen. Informationen dazu, wie Sie Ihren Stream für den kontoübergreifenden Zugriff konfigurieren, finden Sie unter [Zugriff mit kontoübergreifenden Lambda-Funktionen teilen](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/rbac-cross-account-access.html#rbac-analyze-cross-account-lambda-access) im *Amazon DynamoDB DynamoDB-Entwicklerhandbuch*.

Nachdem Sie Ihren Stream mit einer ressourcenbasierten Richtlinie konfiguriert haben, die Ihrer Lambda-Funktion die erforderlichen Berechtigungen erteilt, erstellen Sie die Ereignisquellenzuordnung mit Ihrem kontoübergreifenden Stream-ARN. Sie finden den Stream-ARN auf der Registerkarte **Exporte und Streams** der Tabelle in der kontoübergreifenden DynamoDB-Konsole. 

Wenn Sie die Lambda-Konsole verwenden, fügen Sie den Stream-ARN direkt in das Eingabefeld der DynamoDB-Tabelle auf der Seite zur Erstellung der Ereignisquellenzuordnung ein.

 **Hinweis:** Regionsübergreifende Trigger werden nicht unterstützt. 

# Konfigurieren einer teilweisen Batch-Antwort mit DynamoDB und Lambda
<a name="services-ddb-batchfailurereporting"></a>

Beim Konsumieren und Verarbeiten von Streaming-Daten aus einer Ereignisquelle werden standardmäßig Lambda-Checkpoints auf die höchste Sequenznummer eines Batches nur dann überprüft, wenn der Batch ein voller Erfolg ist. Lambda behandelt alle anderen Ergebnisse als einen vollständigen Fehler und versucht, den Batch bis zum Wiederholungslimit zu verarbeiten. Um beim Verarbeiten von Stapeln aus einem Stream Teilerfolge zu ermöglichen, aktivieren Sie `ReportBatchItemFailures`. Das Zulassen von Teilerfolgen kann dazu beitragen, die Anzahl der Wiederholungen in einer Aufzeichnung zu reduzieren, obwohl die Möglichkeit von Wiederholungen in einer erfolgreichen Aufzeichnung nicht vollständig verhindert wird.

Zum Aktivieren von `ReportBatchItemFailures` fügen Sie den Enum-Wert **ReportBatchItemFailures** der [FunctionResponseTypes](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionResponseTypes)-Liste hinzu. Diese Liste zeigt an, welche Antworttypen für Ihre Funktion aktiviert sind. Sie können diese Liste konfigurieren, wenn Sie eine Zuordnung von Ereignisquellen [erstellen](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) oder [aktualisieren](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html).

**Anmerkung**  
Selbst wenn Ihr Funktionscode teilweise Antworten bei Batch-Fehlern zurückgibt, werden diese Antworten nicht von Lambda verarbeitet, es sei denn, das `ReportBatchItemFailures`-Feature ist explizit für Ihre Zuordnung von Ereignisquellen aktiviert.

## Berichtsyntax
<a name="streams-batchfailurereporting-syntax"></a>

Beim Konfigurieren von Berichten zu Batch-Elementfehlern wird die `StreamsEventResponse`-Klasse mit einer Liste von Batch-Elementfehlern zurückgegeben. Sie können ein `StreamsEventResponse`-Objekt verwenden, um die Sequenznummer des ersten fehlgeschlagenen Datensatzes im Batch zurückzugeben. Sie können auch Ihre eigene benutzerdefinierte Klasse mit der richtigen Antwortsyntax erstellen. Die folgende JSON-Struktur zeigt die erforderliche Antwortsyntax:

```
{ 
  "batchItemFailures": [ 
        {
            "itemIdentifier": "<SequenceNumber>"
        }
    ]
}
```

**Anmerkung**  
Wenn das `batchItemFailures`-Array mehrere Elemente enthält, verwendet Lambda den Datensatz mit der niedrigsten Sequenznummer als Kontrollpunkt. Lambda wiederholt dann alle Datensätze ab diesem Kontrollpunkt.

## Erfolgs- und Misserfolgsbedingungen
<a name="streams-batchfailurereporting-conditions"></a>

Lambda behandelt einen Batch als vollständigen Erfolg, wenn Sie eines der folgenden Elemente zurückgeben:
+ Eine leere `batchItemFailure`-Liste
+ Eine ungültige `batchItemFailure`-Liste
+ Ein leeres `EventResponse`
+ Ein ungültiges `EventResponse`

Lambda behandelt einen Batch als vollständigen Misserfolg, wenn Sie eines der folgenden Elemente zurückgeben:
+ Eine leere Zeichenfolge `itemIdentifier`
+ Ein ungültiges `itemIdentifier`
+ Ein `itemIdentifier` mit einem falschen Schlüsselnamen

Lambda wiederholt Fehler basierend auf Ihrer Wiederholungsstrategie.

## Einen Batch halbieren
<a name="streams-batchfailurereporting-bisect"></a>

Wenn Ihr Aufruf fehlschlägt und `BisectBatchOnFunctionError` eingeschaltet ist, wird der Stapel unabhängig von Ihrer `ReportBatchItemFailures`-Einstellung halbiert.

Wenn eine partielle Batch-Erfolgsantwort empfangen wird und sowohl `BisectBatchOnFunctionError` als auch `ReportBatchItemFailures` aktiviert sind, wird der Batch mit der zurückgegebenen Sequenznummer halbiert und Lambda versucht nur die verbleibenden Datensätze erneut.

Um die Implementierung der partiellen Batch-Antwortlogik zu vereinfachen, sollten Sie in Erwägung ziehen, das [Batch Processor Utility](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/) von Powertools for zu verwenden AWS Lambda, das diese Komplexität automatisch für Sie erledigt.

Hier sind einige Beispiele für Funktionscode, der die Liste der fehlgeschlagenen Nachrichten IDs im Batch zurückgibt:

------
#### [ .NET ]

**SDK für .NET**  
 Es gibt noch mehr dazu GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Melden von DynamoDB-Batchelementfehlern mit Lambda unter Verwendung von .NET.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace AWSLambda_DDB;

public class Function
{
    public StreamsEventResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)

    {
        context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");
        List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new List<StreamsEventResponse.BatchItemFailure>();
        StreamsEventResponse streamsEventResponse = new StreamsEventResponse();

        foreach (var record in dynamoEvent.Records)
        {
            try
            {
                var sequenceNumber = record.Dynamodb.SequenceNumber;
                context.Logger.LogInformation(sequenceNumber);
            }
            catch (Exception ex)
            {
                context.Logger.LogError(ex.Message);
                batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = record.Dynamodb.SequenceNumber });
            }
        }

        if (batchItemFailures.Count > 0)
        {
            streamsEventResponse.BatchItemFailures = batchItemFailures;
        }

        context.Logger.LogInformation("Stream processing complete.");
        return streamsEventResponse;
    }
}
```

------
#### [ Go ]

**SDK für Go V2**  
 Es gibt noch mehr GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Melden von DynamoDB-Batchelementfehlern mit Lambda unter Verwendung von Go.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

type BatchItemFailure struct {
	ItemIdentifier string `json:"ItemIdentifier"`
}

type BatchResult struct {
	BatchItemFailures []BatchItemFailure `json:"BatchItemFailures"`
}

func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*BatchResult, error) {
	var batchItemFailures []BatchItemFailure
	curRecordSequenceNumber := ""

	for _, record := range event.Records {
		// Process your record
		curRecordSequenceNumber = record.Change.SequenceNumber
	}

	if curRecordSequenceNumber != "" {
		batchItemFailures = append(batchItemFailures, BatchItemFailure{ItemIdentifier: curRecordSequenceNumber})
	}
	
	batchResult := BatchResult{
		BatchItemFailures: batchItemFailures,
	}

	return &batchResult, nil
}

func main() {
	lambda.Start(HandleRequest)
}
```

------
#### [ Java ]

**SDK für Java 2.x**  
 Es gibt noch mehr GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Melden von DynamoDB-Batchelementfehlern mit Lambda unter Verwendung von Java.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord;

import java.util.ArrayList;
import java.util.List;

public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, StreamsEventResponse> {

    @Override
    public StreamsEventResponse handleRequest(DynamodbEvent input, Context context) {

        List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
        String curRecordSequenceNumber = "";

        for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) {
          try {
                //Process your record
                StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb();
                curRecordSequenceNumber = dynamodbRecord.getSequenceNumber();
                
            } catch (Exception e) {
                /* Since we are working with streams, we can return the failed item immediately.
                   Lambda will immediately begin to retry processing from this failed item onwards. */
                batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
                return new StreamsEventResponse(batchItemFailures);
            }
        }
       
       return new StreamsEventResponse();   
    }
}
```

------
#### [ JavaScript ]

**SDK für JavaScript (v3)**  
 Es gibt noch mehr dazu GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Melden von DynamoDB-Batchelementfehlern mit Lambda unter Verwendung von. JavaScript  

```
export const handler = async (event) => {
  const records = event.Records;
  let curRecordSequenceNumber = "";

  for (const record of records) {
    try {
      // Process your record
      curRecordSequenceNumber = record.dynamodb.SequenceNumber;
    } catch (e) {
      // Return failed record's sequence number
      return { batchItemFailures: [{ itemIdentifier: curRecordSequenceNumber }] };
    }
  }

  return { batchItemFailures: [] };
};
```
Melden von DynamoDB-Batchelementfehlern mit Lambda unter Verwendung von. TypeScript  

```
import {
  DynamoDBBatchResponse,
  DynamoDBBatchItemFailure,
  DynamoDBStreamEvent,
} from "aws-lambda";

export const handler = async (
  event: DynamoDBStreamEvent
): Promise<DynamoDBBatchResponse> => {
  const batchItemFailures: DynamoDBBatchItemFailure[] = [];
  let curRecordSequenceNumber;

  for (const record of event.Records) {
    curRecordSequenceNumber = record.dynamodb?.SequenceNumber;

    if (curRecordSequenceNumber) {
      batchItemFailures.push({
        itemIdentifier: curRecordSequenceNumber,
      });
    }
  }

  return { batchItemFailures: batchItemFailures };
};
```

------
#### [ PHP ]

**SDK für PHP**  
 Es gibt noch mehr dazu. GitHub Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Melden von DynamoDB-Batchelementfehlern mit Lambda unter Verwendung von PHP.  

```
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler implements StdHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handle(mixed $event, Context $context): array
    {
        $dynamoDbEvent = new DynamoDbEvent($event);
        $this->logger->info("Processing records");

        $records = $dynamoDbEvent->getRecords();
        $failedRecords = [];
        foreach ($records as $record) {
            try {
                $data = $record->getData();
                $this->logger->info(json_encode($data));
                // TODO: Do interesting work based on the new data
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
                // failed processing the record
                $failedRecords[] = $record->getSequenceNumber();
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");

        // change format for the response
        $failures = array_map(
            fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber],
            $failedRecords
        );

        return [
            'batchItemFailures' => $failures
        ];
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK für Python (Boto3)**  
 Es gibt noch mehr GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Melden von DynamoDB-Batchelementfehlern mit Lambda unter Verwendung von Python.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def handler(event, context):
    records = event.get("Records")
    curRecordSequenceNumber = ""
    
    for record in records:
        try:
            # Process your record
            curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"]
        except Exception as e:
            # Return failed record's sequence number
            return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]}

    return {"batchItemFailures":[]}
```

------
#### [ Ruby ]

**SDK für Ruby**  
 Es gibt noch mehr GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Melden von DynamoDB-Batchelementfehlern mit Lambda unter Verwendung von Ruby.  

```
def lambda_handler(event:, context:)
    records = event["Records"]
    cur_record_sequence_number = ""
  
    records.each do |record|
      begin
        # Process your record
        cur_record_sequence_number = record["dynamodb"]["SequenceNumber"]
      rescue StandardError => e
        # Return failed record's sequence number
        return {"batchItemFailures" => [{"itemIdentifier" => cur_record_sequence_number}]}
      end
    end
  
    {"batchItemFailures" => []}
  end
```

------
#### [ Rust ]

**SDK für Rust**  
 Es gibt noch mehr dazu GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling). 
Melden von DynamoDB-Batchelementfehlern mit Lambda unter Verwendung von Rust.  

```
use aws_lambda_events::{
    event::dynamodb::{Event, EventRecord, StreamRecord},
    streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

/// Process the stream record
fn process_record(record: &EventRecord) -> Result<(), Error> {
    let stream_record: &StreamRecord = &record.change;

    // process your stream record here...
    tracing::info!("Data: {:?}", stream_record);

    Ok(())
}

/// Main Lambda handler here...
async fn function_handler(event: LambdaEvent<Event>) -> Result<DynamoDbEventResponse, Error> {
    let mut response = DynamoDbEventResponse {
        batch_item_failures: vec![],
    };

    let records = &event.payload.records;

    if records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(response);
    }

    for record in records {
        tracing::info!("EventId: {}", record.event_id);

        // Couldn't find a sequence number
        if record.change.sequence_number.is_none() {
            response.batch_item_failures.push(DynamoDbBatchItemFailure {
                item_identifier: Some("".to_string()),
            });
            return Ok(response);
        }

        // Process your record here...
        if process_record(record).is_err() {
            response.batch_item_failures.push(DynamoDbBatchItemFailure {
                item_identifier: record.change.sequence_number.clone(),
            });
            /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
            return Ok(response);
        }
    }

    tracing::info!("Successfully processed {} record(s)", records.len());

    Ok(response)
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

## Powertools für die AWS Lambda Batch-Verarbeitung verwenden
<a name="services-ddb-batchfailurereporting-powertools"></a>

Das Batchverarbeitungsprogramm von Powertools for verarbeitet AWS Lambda automatisch die Logik für partielle Batch-Antworten und reduziert so die Komplexität der Implementierung von Batch-Fehlerberichten. Hier sind Beispiele für die Verwendung des Batch-Prozessors:

**Python**  
Umfassende Beispiele und Anweisungen zur Einrichtung finden Sie in der [Dokumentation zum Batch-Prozessor](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/).
Verarbeitung von DynamoDB-Stream-Datensätzen mit AWS Lambda Batch-Prozessor.  

```
import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes import DynamoDBStreamEvent
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
logger = Logger()

def record_handler(record):
    logger.info(record)
    # Your business logic here
    # Raise an exception to mark this record as failed
    
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(
        event=event, 
        record_handler=record_handler, 
        processor=processor,
        context=context
    )
```

**TypeScript**  
Umfassende Beispiele und Anweisungen zur Einrichtung finden Sie in der [Dokumentation zum Batch-Prozessor](https://docs.aws.amazon.com/powertools/typescript/latest/features/batch/).
Verarbeitung von DynamoDB-Stream-Datensätzen mit AWS Lambda Batch-Prozessor.  

```
import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { DynamoDBStreamEvent, Context } from 'aws-lambda';

const processor = new BatchProcessor(EventType.DynamoDBStreams);
const logger = new Logger();

const recordHandler = async (record: any): Promise<void> => {
    logger.info('Processing record', { record });
    // Your business logic here
    // Throw an error to mark this record as failed
};

export const handler = async (event: DynamoDBStreamEvent, context: Context) => {
    return processPartialResponse(event, recordHandler, processor, {
        context,
    });
};
```

**Java**  
Umfassende Beispiele und Anweisungen zur Einrichtung finden Sie in der [Dokumentation zum Batch-Prozessor](https://docs.powertools.aws.dev/lambda/java/latest/utilities/batch/).
Verarbeitung von DynamoDB-Stream-Datensätzen mit AWS Lambda Batch-Prozessor.  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;

public class DynamoDBStreamBatchHandler implements RequestHandler<DynamodbEvent, StreamsEventResponse> {

    private final BatchMessageHandler<DynamodbEvent, StreamsEventResponse> handler;

    public DynamoDBStreamBatchHandler() {
        handler = new BatchMessageHandlerBuilder()
                .withDynamoDbBatchHandler()
                .buildWithRawMessageHandler(this::processMessage);
    }

    @Override
    public StreamsEventResponse handleRequest(DynamodbEvent ddbEvent, Context context) {
        return handler.processBatch(ddbEvent, context);
    }

    private void processMessage(DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord, Context context) {
        // Process the change record
    }
}
```

**.NET**  
Umfassende Beispiele und Anweisungen zur Einrichtung finden Sie in der [Dokumentation zum Batch-Prozessor](https://docs.aws.amazon.com/powertools/dotnet/utilities/batch-processing/).
Verarbeitung von DynamoDB-Stream-Datensätzen mit AWS Lambda Batch-Prozessor.  

```
using System;
using System.Threading;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
using Amazon.Lambda.Serialization.SystemTextJson;
using AWS.Lambda.Powertools.BatchProcessing;

[assembly: LambdaSerializer(typeof(DefaultLambdaJsonSerializer))]

namespace HelloWorld;

public class Customer
{
    public string? CustomerId { get; set; }
    public string? Name { get; set; }
    public string? Email { get; set; }
    public DateTime CreatedAt { get; set; }
}

internal class TypedDynamoDbRecordHandler : ITypedRecordHandler<Customer> 
{
    public async Task<RecordHandlerResult> HandleAsync(Customer customer, CancellationToken cancellationToken)
    {
        if (string.IsNullOrEmpty(customer.Email)) 
        {
            throw new ArgumentException("Customer email is required");
        }

        return await Task.FromResult(RecordHandlerResult.None); 
    }
}

public class Function
{
    [BatchProcessor(TypedRecordHandler = typeof(TypedDynamoDbRecordHandler))]
    public BatchItemFailuresResponse HandlerUsingTypedAttribute(DynamoDBEvent _)
    {
        return TypedDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; 
    }
}
```

# Verworfene Datensätze für eine DynamoDB-Ereignisquelle in Lambda aufbewahren
<a name="services-dynamodb-errors"></a>

Die Fehlerbehandlung für DynamoDB-Zuordnungen von Ereignisquellen hängt davon ab, ob der Fehler vor dem Aufruf der Funktion oder während des Funktionsaufrufs auftritt:
+ **Vor dem Aufruf:** Wenn eine Lambda-Ereignisquellenzuordnung die Funktion aufgrund von Drosselung oder anderen Problemen nicht aufrufen kann, versucht sie es erneut, bis die Datensätze ablaufen oder das in der Ereignisquellenzuordnung () konfigurierte Höchstalter überschreiten. [MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)
+ **Während des Aufrufs:** Wenn die Funktion aufgerufen wird, aber einen Fehler zurückgibt, versucht Lambda es erneut, bis die Datensätze ablaufen, das Höchstalter ([MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)) überschreiten oder die konfigurierte Wiederholungsquote () erreicht haben. [MaximumRetryAttempts](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRetryAttempts) Bei Funktionsfehlern können Sie auch konfigurieren [BisectBatchOnFunctionError](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-response-BisectBatchOnFunctionError), dass ein fehlgeschlagener Batch in zwei kleinere Batches aufgeteilt wird, wodurch fehlerhafte Datensätze isoliert und Timeouts vermieden werden. Durch das Aufteilen von Batches wird die Quote für Wiederholungen nicht verbraucht.

Wenn die Fehlerbehandlungsmaßnahmen fehlschlagen, verwirft Lambda die Datensätze und setzt die Verarbeitung von Batches aus dem Stream fort. Bei den Standardeinstellungen bedeutet dies, dass ein fehlerhafter Datensatz die Verarbeitung auf dem betroffenen Shard für bis zu einen Tag blockieren kann. Um dies zu vermeiden, konfigurieren Sie die Ereignisquellenzuordnung Ihrer Funktion mit einer angemessenen Anzahl von Wiederholungen und einem maximalen Datensatzalter, das zu Ihrem Anwendungsfall passt.

## Konfigurieren von Zielen für fehlgeschlagene Aufrufe
<a name="dynamodb-on-failure-destination-console"></a>

Um Datensätze zu fehlgeschlagenen Aufrufen zur Zuordnung von Ereignisquellen beizubehalten, fügen Sie der Zuordnung von Ereignisquellen Ihrer Funktion ein Ziel hinzu. Jeder an das Ziel gesendete Datensatz ist ein JSON-Dokument mit Metadaten über den fehlgeschlagenen Aufruf. Bei Amazon S3-Zielen sendet Lambda auch den gesamten Aufrufdatensatz zusammen mit den Metadaten. Sie können jedes Amazon SNS SNS-Thema, jede Amazon SQS SQS-Warteschlange, jeden Amazon S3 S3-Bucket oder Kafka als Ziel konfigurieren.

Bei Amazon-S3-Zielen können Sie das Feature [Amazon S3 Event Notifications](https://docs.aws.amazon.com/) verwenden, um Benachrichtigungen zu erhalten, wenn Objekte in Ihren S3-Ziel-Bucket hochgeladen werden. Sie können S3-Ereignisbenachrichtigungen auch so konfigurieren, dass sie eine andere Lambda-Funktion aufrufen, um eine automatische Verarbeitung für fehlgeschlagene Stapel durchzuführen.

Ihre Ausführungsrolle muss über Berechtigungen für das Ziel verfügen:
+ **[Für ein SQS-Ziel: sqs: SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)**
+ **[Für ein SNS-Ziel: sns:Publish](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html)**
+ **Für ein S3-Ziel: s3:** [und [s3: PutObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html) ListBucket](https://docs.aws.amazon.com/AmazonS3/latest/API/ListObjectsV2.html)
+ **Für ein Kafka-Ziel: [kafka-cluster](https://docs.aws.amazon.com/msk/latest/developerguide/kafka-actions.html):** WriteData

Sie können ein Kafka-Thema als Ziel für den Fall eines Fehlers für Ihre Quellenzuordnungen für Kafka-Ereignisse konfigurieren. Wenn Lambda Datensätze nach anstrengenden Wiederholungsversuchen nicht verarbeiten kann oder wenn Datensätze das Höchstalter überschreiten, sendet Lambda die fehlgeschlagenen Datensätze zur späteren Verarbeitung an das angegebene Kafka-Thema. Weitere Informationen finden Sie unter [Ein Kafka-Thema als Ziel für den Fall eines Fehlers verwenden](kafka-on-failure-destination.md).

[Wenn Sie die Verschlüsselung mit Ihrem eigenen KMS-Schlüssel für ein S3-Ziel aktiviert haben, muss die Ausführungsrolle Ihrer Funktion auch die Berechtigung haben, kms: aufzurufen. GenerateDataKey](https://docs.aws.amazon.com/kms/latest/APIReference/API_GenerateDataKey.html) Wenn sich der KMS-Schlüssel und das S3-Bucket-Ziel in einem anderen Konto als Ihre Lambda-Funktion und Ausführungsrolle befinden, konfigurieren Sie den KMS-Schlüssel so, dass er der Ausführungsrolle vertraut, die zugelassen kms: GenerateDataKey wird.

Gehen Sie folgendermaßen vor, um ein Ausfallziel mit der Konsole zu konfigurieren:

1. Öffnen Sie die Seite [Funktionen](https://console.aws.amazon.com/lambda/home#/functions) der Lambda-Konsole.

1. Wählen Sie eine Funktion aus.

1. Wählen Sie unter **Function overview (Funktionsübersicht)** die Option **Add destination (Ziel hinzufügen)**.

1. Wählen Sie als **Quelle** die Option **Aufruf der Zuordnung von Ereignisquellen** aus.

1. Wählen Sie für die **Zuordnung von Ereignisquellen** eine Ereignisquelle aus, die für diese Funktion konfiguriert ist.

1. Wählen Sie für **Bedingung** die Option **Bei Ausfall** aus. Für Aufrufe zur Zuordnung von Ereignisquellen ist dies die einzig akzeptierte Bedingung.

1. Wählen Sie unter **Zieltyp** den Zieltyp aus, an den Lambda Aufrufdatensätze sendet.

1. Wählen Sie unter **Destination (Ziel)** eine Ressource aus.

1. Wählen Sie **Speichern**.

Sie können mit AWS Command Line Interface (AWS CLI) auch ein Ziel für den Fall eines Fehlers konfigurieren. Mit dem folgenden [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html)Befehl wird beispielsweise eine Zuordnung der Ereignisquelle mit einem SQS-Ziel für den Fall eines Fehlers hinzugefügt: `MyFunction`

```
aws lambda create-event-source-mapping \
--function-name "MyFunction" \
--event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
```

Der folgende [update-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/update-event-source-mapping.html)Befehl aktualisiert eine Ereignisquellenzuordnung, sodass fehlgeschlagene Aufrufdatensätze nach zwei Wiederholungsversuchen oder wenn die Datensätze älter als eine Stunde sind, an ein SNS-Ziel gesendet werden.

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--maximum-retry-attempts 2 \
--maximum-record-age-in-seconds 3600 \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'
```

Aktualisierte Einstellungen werden asynchron angewendet und werden erst nach Abschluss des Vorgangs in der Ausgabe berücksichtigt. Verwenden Sie den [get-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/get-event-source-mapping.html)-Befehl, um den aktuellen Status anzuzeigen.

Um ein Ziel zu entfernen, geben Sie eine leere Zeichenfolge als Argument für den `destination-config`-Parameter an:

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--destination-config '{"OnFailure": {"Destination": ""}}'
```

### Bewährte Methoden für die Sicherheit in Amazon S3-Zielen
<a name="ddb-s3-destination-security"></a>

Das Löschen eines S3-Buckets, der als Ziel konfiguriert ist, ohne das Ziel aus der Konfiguration Ihrer Funktion zu entfernen, kann ein Sicherheitsrisiko darstellen. Wenn ein anderer Benutzer den Namen Ihres Ziel-Buckets kennt, kann er den Bucket in seinem AWS-Konto neu erstellen. Aufzeichnungen über fehlgeschlagene Aufrufe werden an den entsprechenden Bucket gesendet, wodurch möglicherweise Daten aus Ihrer Funktion verfügbar gemacht werden.

**Warnung**  
Um sicherzustellen, dass Aufrufdatensätze Ihrer Funktion nicht an einen S3-Bucket in einem anderen gesendet werden können AWS-Konto, fügen Sie der Ausführungsrolle Ihrer Funktion eine Bedingung hinzu, die die `s3:PutObject` Berechtigungen auf Buckets in Ihrem Konto beschränkt. 

-Das folgende Beispiel zeigt eine IAM-Richtlinie, die die `s3:PutObject`-Berechtigungen Ihrer Funktion auf Buckets in Ihrem Konto beschränkt. Diese Richtlinie gibt Lambda auch die `s3:ListBucket`-Erlaubnis, einen S3-Bucket als Ziel zu verwenden.

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3BucketResourceAccountWrite",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::*/*",
                "arn:aws:s3:::*"
            ],
            "Condition": {
                "StringEquals": {
                    "s3:ResourceAccount": "111122223333"
                }
            }
        }
    ]
}
```

Um der Ausführungsrolle Ihrer Funktion mithilfe von AWS-Managementkonsole oder eine Berechtigungsrichtlinie hinzuzufügen AWS CLI, lesen Sie die Anweisungen in den folgenden Verfahren:

------
#### [ Console ]

**So fügen Sie der Ausführungsrolle einer Funktion (Konsole) eine Berechtigungsrichtlinie hinzu**

1. Öffnen Sie die Seite [Funktionen](https://console.aws.amazon.com/lambda/home#/functions) der Lambda-Konsole.

1. Wählen Sie die Lambda-Funktion aus, deren Ausführungsrolle Sie ändern möchten.

1. Klicken Sie in der Registerkarte **Konfiguration** auf die Option **Berechtigungen**.

1. Wählen Sie auf der Registerkarte **Ausführungsrolle** den **Rollennamen** Ihrer Funktion aus, um die IAM-Konsolenseite der Rolle zu öffnen.

1. Fügen Sie der Rolle wie folgt eine Richtlinie mit Berechtigungen hinzu:

   1. Wählen Sie im Bereich **Berechtigungsrichtlinien** die Optionen **Berechtigungen hinzufügen** und dann **Inline-Richtlinie erstellen** aus.

   1. Wählen Sie im **Richtlinien-Editor** **JSON** aus.

   1. Fügen Sie die Richtlinie, die Sie hinzufügen möchten, in den Editor ein (indem Sie die vorhandene JSON-Datei ersetzt) und wählen Sie dann **Weiter** aus.

   1. Geben Sie unter **Richtliniendetails** für den **Richtliniennamen** ein.

   1. Wählen Sie **Richtlinie erstellen** aus.

------
#### [ AWS CLI ]

**So fügen Sie der Ausführungsrolle einer Funktion (CLI) eine Berechtigungsrichtlinie hinzu**

1. Erstellen Sie ein JSON-Richtliniendokument mit den erforderlichen Berechtigungen und speichern Sie es in einem lokalen Verzeichnis.

1. Verwenden Sie den `put-role-policy` IAM-CLI-Befehl, um die Berechtigungen zur Ausführungsrolle Ihrer Funktion hinzuzufügen. Führen Sie den folgenden Befehl in dem Verzeichnis aus, in dem Sie Ihr JSON-Richtliniendokument gespeichert haben und ersetzen Sie den Rollennamen, den Richtliniennamen und das Richtliniendokument durch Ihre eigenen Werte.

   ```
   aws iam put-role-policy \
   --role-name my_lambda_role \
   --policy-name LambdaS3DestinationPolicy \
   --policy-document file://my_policy.json
   ```

------

### Beispiel für einen Amazon SNS- und Amazon SQS-Aufrufsatz
<a name="kinesis-on-failure-destination-example-sns-sqs"></a>

Das folgende Beispiel zeigt einen Aufrufsatz, den Lambda an ein SQS- oder SNS-Ziel für einen DynamoDB-Stream sendet.

```
{
    "requestContext": {
        "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:13:49.717Z",
    "DDBStreamBatchInfo": {
        "shardId": "shardId-00000001573689847184-864758bb",
        "startSequenceNumber": "800000000003126276362",
        "endSequenceNumber": "800000000003126276362",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z",
        "batchSize": 1,
        "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388"
    }
}
```

Sie können diese Informationen verwenden, um die betroffenen Datensätze aus dem Stream für die Fehlersuche abzurufen. Die tatsächlichen Datensätze sind nicht enthalten, daher müssen Sie diesen Datensatz verarbeiten und aus dem Stream abrufen, bevor sie ablaufen und verloren gehen.

### Beispiel eines Amazon S3-Aufrufdatensatzes
<a name="kinesis-on-failure-destination-example-sns-sqs-s3"></a>

Das folgende Beispiel zeigt einen Aufrufsatz, den Lambda für einen DynamoDB-Stream an einen S3-Bucket sendet. Zusätzlich zu allen Feldern aus dem vorherigen Beispiel für SQS- und SNS-Ziele enthält das Feld `payload` den ursprünglichen Aufrufdatensatz als maskierte JSON-Zeichenfolge.

```
{
    "requestContext": {
        "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:13:49.717Z",
    "DDBStreamBatchInfo": {
        "shardId": "shardId-00000001573689847184-864758bb",
        "startSequenceNumber": "800000000003126276362",
        "endSequenceNumber": "800000000003126276362",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z",
        "batchSize": 1,
        "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388"
    },
    "payload": "<Whole Event>" // Only available in S3
}
```

Das S3-Objekt, das den Aufrufdatensatz enthält, verwendet die folgende Namenskonvention:

```
aws/lambda/<ESM-UUID>/<shardID>/YYYY/MM/DD/YYYY-MM-DDTHH.MM.SS-<Random UUID>
```

# Implementierung der statusbehafteten DynamoDB-Stream-Verarbeitung in Lambda
<a name="services-ddb-windows"></a>

Lambda-Funktionen können kontinuierliche Stream-Verarbeitungsanwendungen ausführen. Ein Stream entspricht einer unbegrenzten Menge von Daten, die kontinuierlich durch Ihre Anwendung fließen. Um Informationen aus dieser sich ständig aktualisierenden Eingabe zu analysieren, können Sie die enthaltenen Datensätze mithilfe eines zeitlich definierten Fensters binden.

Rollierende Fenster sind unterschiedliche Zeitfenster, die sich in regelmäßigen Abständen öffnen und schließen. Standardmäßig sind Lambda-Aufrufe zustandslos – Sie können sie nicht für die Verarbeitung von Daten über mehrere kontinuierliche Aufrufe hinweg ohne eine externe Datenbank verwenden. Mit rollierenden Fenstern können Sie jedoch Ihren Status über Aufrufe hinweg beibehalten. Dieser Zustand enthält das Gesamtergebnis der Nachrichten, die zuvor für das aktuelle Fenster verarbeitet wurden. Ihr Zustand kann maximal 1 MB pro Shard betragen. Wenn er diese Größe überschreitet, wird Lambda das Fenster vorzeitig beenden.

Jeder Datensatz in einem Stream gehört zu einem bestimmten Fenster. Lambda verarbeitet jeden Datensatz mindestens einmal, garantiert jedoch nicht, dass jeder Datensatz nur einmal verarbeitet wird. In seltenen Fällen, etwa bei der Fehlerbehandlung, werden einige Datensätze möglicherweise mehrmals verarbeitet. Datensätze werden beim ersten Mal immer in der richtigen Reihenfolge verarbeitet. Wenn Datensätze mehr als einmal verarbeitet werden, werden sie nicht in der richtigen Reihenfolge verarbeitet.

## Aggregation und Verarbeitung
<a name="streams-tumbling-processing"></a>

Ihre benutzerverwaltete Funktion wird sowohl zur Aggregation als auch zur Verarbeitung der Endergebnisse dieser Aggregation aufgerufen. Lambda aggregiert alle im Fenster empfangenen Datensätze. Sie können diese Datensätze in mehreren Stapeln erhalten, jeweils als ein separater Aufruf. Jeder Aufruf erhält einen Zustand. Wenn Sie also rollierende Fenster verwenden, muss Ihre Lambda-Funktionsantwort eine `state`-Eigenschaft enthalten. Wenn die Antwort keine `state`-Eigenschaft enthält, betrachtet Lambda dies als fehlgeschlagenen Aufruf. Um diese Bedingung zu erfüllen, kann Ihre Funktion ein `TimeWindowEventResponse`-Objekt zurückgeben, das die folgende JSON-Form aufweist:

**Example `TimeWindowEventResponse`-Werte**  

```
{
    "state": {
        "1": 282,
        "2": 715
    },
    "batchItemFailures": []
}
```

**Anmerkung**  
Für Java-Funktionen empfehlen wir, eine `Map<String, String>` zu verwenden, um den Status darzustellen.

Am Ende des Fensters wird das Flag `isFinalInvokeForWindow` auf `true` gesetzt, um anzugeben, dass es sich um den Endzustand handelt und dass es für die Verarbeitung bereit ist. Nach der Verarbeitung werden das Fenster und Ihr endgültiger Aufruf wird abgeschlossen, und dann wird der Zustand gelöscht.

Am Ende Ihres Fensters verwendet Lambda die endgültige Verarbeitung für Aktionen an den Aggregationsergebnissen. Ihre endgültige Verarbeitung wird synchron aufgerufen. Nach erfolgreichem Aufruf zeigt Ihre Funktion auf die Sequenznummer und die Stream-Verarbeitung wird fortgesetzt. Wenn der Aufruf nicht erfolgreich ist, unterbricht Ihre Lambda-Funktion die weitere Verarbeitung bis zu einem erfolgreichen Aufruf.

**Example DynamodbTimeWindowEvent**  

```
{
   "Records":[
      {
         "eventID":"1",
         "eventName":"INSERT",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"111",
            "SizeBytes":26,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"2",
         "eventName":"MODIFY",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"222",
            "SizeBytes":59,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"3",
         "eventName":"REMOVE",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"333",
            "SizeBytes":38,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      }
   ],
    "window": {
        "start": "2020-07-30T17:00:00Z",
        "end": "2020-07-30T17:05:00Z"
    },
    "state": {
        "1": "state1"
    },
    "shardId": "shard123456789",
    "eventSourceARN": "stream-ARN",
    "isFinalInvokeForWindow": false,
    "isWindowTerminatedEarly": false
}
```

## Konfiguration
<a name="streams-tumbling-config"></a>

Sie können rollierende Fenster konfigurieren, wenn Sie eine Ereignisquellenzuordnung erstellen oder aktualisieren. Um ein Tumbling-Fenster zu konfigurieren, geben Sie das Fenster in Sekunden an (). [TumblingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-TumblingWindowInSeconds) Mit dem folgenden Beispielbefehl AWS Command Line Interface (AWS CLI) wird eine Quellenzuordnung für Streaming-Ereignisse erstellt, die ein Wechselfenster von 120 Sekunden hat. Die für Aggregation und Verarbeitung definierte Lambda-Funktion wird `tumbling-window-example-function` genannt.

```
aws lambda create-event-source-mapping \
--event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \
--function-name tumbling-window-example-function \
--starting-position TRIM_HORIZON \
--tumbling-window-in-seconds 120
```

Lambda bestimmt die rollierenden Fenstergrenzen basierend auf dem Zeitpunkt, zu dem Datensätze in den Stream eingefügt wurden. Für alle Datensätze steht ein ungefährer Zeitstempel zur Verfügung, den Lambda in Grenzbestimmungen verwendet.

Rollierende Fensteraggregationen unterstützen kein Resharding. Wenn der Shard endet, betrachtet Lambda das Fenster als geschlossen und die untergeordneten Shards beginnen ihr eigenes Fenster in einem neuen Zustand.

Rollierende Fenster unterstützen vollständig die bestehenden Wiederholungsrichtlinien `maxRetryAttempts` und `maxRecordAge`.

**Example Handler.py – Aggregation und Verarbeitung**  
Die folgende Python-Funktion veranschaulicht, wie Sie Ihren Endzustand aggregieren und dann verarbeiten:  

```
def lambda_handler(event, context):
    print('Incoming event: ', event)
    print('Incoming state: ', event['state'])

#Check if this is the end of the window to either aggregate or process.
    if event['isFinalInvokeForWindow']:
        # logic to handle final state of the window
        print('Destination invoke')
    else:
        print('Aggregate invoke')

#Check for early terminations
    if event['isWindowTerminatedEarly']:
        print('Window terminated early')

    #Aggregation logic
    state = event['state']
    for record in event['Records']:
        state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1

    print('Returning state: ', state)
    return {'state': state}
```

# Lambdaparameter für Amazon DynamoDB-SQS-Zuordnungen von Ereignisquellen
<a name="services-ddb-params"></a>

Alle Lambda-Ereignisquelltypen verwenden dieselben [CreateEventSourceMapping[UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html)](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html)API-Operationen. Allerdings gelten nur einige der Parameter für DynamoDB Streams.


| Parameter | Erforderlich | Standard | Hinweise | 
| --- | --- | --- | --- | 
|  BatchSize  |  N  |  100  |  Höchstwert: 10 000.  | 
|  BisectBatchOnFunctionError  |  N  |  false  | Keine  | 
|  DestinationConfig  |  N  | –  |  Ein Ziel der Amazon-SQS-Standardwarteschlange oder des Amazon-SNS-Standardthemas für verworfene Datensätze  | 
|  Aktiviert  |  N  |  true  | Keine  | 
|  EventSourceArn  |  Y  | – |  Der ARN des Datenstroms oder eines Stream-Konsumenten  | 
|  FilterCriteria  |  N  | –  |  [Steuern Sie, welche Ereignisse Lambda an Ihre Funktion sendet](invocation-eventfiltering.md)  | 
|  FunctionName  |  Y  | –  | Keine  | 
|  FunctionResponseTypes  |  N  | – |  Damit Ihre Funktion bestimmte Fehler in einem Batch meldet, beziehen Sie den Wert `ReportBatchItemFailures` in `FunctionResponseTypes` ein. Weitere Informationen finden Sie unter [Konfigurieren einer teilweisen Batch-Antwort mit DynamoDB und Lambda](services-ddb-batchfailurereporting.md).  | 
|  MaximumBatchingWindowInSeconds  |  N  |  0  | Keine  | 
|  MaximumRecordAgeInSeconds  |  N  |  -1  |  -1 bedeutet unendlich: Fehlgeschlagene Datensätze werden wiederholt, bis der Datensatz abläuft. Das [Datenaufbewahrungslimit für DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.DataRetention) beträgt 24 Stunden. Minimum: -1 Höchstwert: 604 800  | 
|  MaximumRetryAttempts  |  N  |  -1  |  -1 bedeutet unendlich: Fehlgeschlagene Datensätze werden wiederholt, bis der Datensatz abläuft Minimum: 0 Höchstwert: 10 000.  | 
|  ParallelizationFactor  |  N  |  1  |  Maximum: 10  | 
|  StartingPosition  |  Y  | –  |  TRIM\$1HORIZON oder LATEST  | 
|  TumblingWindowInSeconds  |  N  | –  |  Minimum: 0 Maximum: 900  | 

# Verwendung der Ereignisfilterung mit einer DynamoDB-Ereignisquelle
<a name="with-ddb-filtering"></a>

Sie können die Ereignisfilterung verwenden, um zu steuern, welche Datensätze aus einem Stream oder einer Warteschlange Lambda an Ihre Funktion sendet. Allgemeine Informationen über die Funktionsweise der Ereignisfilterung finden Sie unter [Steuern Sie, welche Ereignisse Lambda an Ihre Funktion sendet](invocation-eventfiltering.md).

Dieser Abschnitt konzentriert sich auf die Ereignisfilterung für DynamoDB-Ereignisquellen.

**Anmerkung**  
DynamoDB-Zuordnungen von Ereignisquellen unterstützen nur das Filtern nach dem `dynamodb`-Schlüssel.

**Topics**
+ [DynamoDB-Ereignis](#filtering-ddb)
+ [Filterung mit Tabellenattributen](#filtering-ddb-attributes)
+ [Filterung mit booleschen Ausdrücken](#filtering-ddb-boolean)
+ [Verwenden des Exists-Operators](#filtering-ddb-exists)
+ [JSON-Format für DynamoDB-Filterung](#filtering-ddb-JSON-format)

## DynamoDB-Ereignis
<a name="filtering-ddb"></a>

Angenommen, Sie haben eine DynamoDB-Tabelle mit dem Primärschlüssel `CustomerName` und den Attributen `AccountManager` und `PaymentTerms`. Im Folgenden sehen Sie einen Beispieldatensatz aus dem Stream Ihrer DynamoDB-Tabelle.

```
{
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
          "ApproximateCreationDateTime": "1678831218.0",
          "Keys": {
              "CustomerName": {
                  "S": "AnyCompany Industries"
              }
          },
          "NewImage": {
              "AccountManager": {
                  "S": "Pat Candella"
              },
              "PaymentTerms": {
                  "S": "60 days"
              },
              "CustomerName": {
                  "S": "AnyCompany Industries"
              }
          },
          "SequenceNumber": "111",
          "SizeBytes": 26,
          "StreamViewType": "NEW_IMAGE"
      }
  }
```

Um anhand der Schlüssel- und Attributwerte in Ihrer DynamoDB-Tabelle zu filtern, verwenden Sie den `dynamodb`-Schlüssel im Datensatz. Die folgenden Abschnitte enthalten Beispiele für verschiedene Filtertypen.

### Filtern mit Tabellenschlüsseln
<a name="filtering-ddb-keys"></a>

Angenommen, Sie möchten, dass Ihre Funktion nur die Datensätze verarbeitet, deren Primärschlüssel `CustomerName` "AnyCompany Industries" lautet. Das `FilterCriteria`-Objekt würde wie folgt aussehen.

```
{
     "Filters": [
          {
              "Pattern": "{ \"dynamodb\" : { \"Keys\" : { \"CustomerName\" : { \"S\" : [ \"AnyCompany Industries\" ] } } } }"
          }
      ]
 }
```

Zur Verdeutlichung sehen Sie hier den Wert des Filter-`Pattern` in reinem JSON. 

```
{
     "dynamodb": {
          "Keys": {
              "CustomerName": {
                  "S": [ "AnyCompany Industries" ]
                  }
              }
          }
 }
```

Sie können Ihren Filter mithilfe der Konsole, AWS CLI oder einer AWS SAM-Vorlage hinzufügen.

------
#### [ Console ]

Um diesen Filter mithilfe der Konsole hinzuzufügen, folgen Sie den Anweisungen unter [Anhängen von Filterkriterien an eine Ereignisquellenzuordnung (Konsole)](invocation-eventfiltering.md#filtering-console) und geben Sie die folgende Zeichenfolge für die **Filterkriterien** ein.

```
{ "dynamodb" : { "Keys" : { "CustomerName" : { "S" : [ "AnyCompany Industries" ] } } } }
```

------
#### [ AWS CLI ]

Um eine neue Zuordnung von Ereignisquellen mit diesen Filterkriterien über die AWS Command Line Interface (AWS CLI) zu erstellen, führen Sie den folgenden Befehl aus.

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"Keys\" : { \"CustomerName\" : { \"S\" : [ \"AnyCompany Industries\" ] } } } }"}]}'
```

Führen Sie den folgenden Befehl aus, um diese Filterkriterien zu einer vorhandenen Zuordnung von Ereignisquellen hinzuzufügen.

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"Keys\" : { \"CustomerName\" : { \"S\" : [ \"AnyCompany Industries\" ] } } } }"}]}'
```

------
#### [ AWS SAM ]

Um diesen Filter mit der AWS SAM hinzuzufügen, fügen Sie das folgende Snippet zur YAML-Vorlage für Ihre Ereignisquelle hinzu.

```
FilterCriteria:
   Filters:
     - Pattern: '{ "dynamodb" : { "Keys" : { "CustomerName" : { "S" : [ "AnyCompany Industries" ] } } } }'
```

------

## Filterung mit Tabellenattributen
<a name="filtering-ddb-attributes"></a>

Mit DynamoDB können Sie auch die Schlüssel `NewImage` und `OldImage` verwenden, um nach Attributwerten zu filtern. Angenommen, Sie möchten Datensätze filtern, bei denen das `AccountManager`-Attribut im letzten Tabellenbild "Pat Candella" oder "Shirley Rodriguez" lautet. Das `FilterCriteria`-Objekt würde wie folgt aussehen.

```
{
    "Filters": [
        {
            "Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\", \"Shirley Rodriguez\" ] } } } }"
        }
    ]
}
```

Zur Verdeutlichung sehen Sie hier den Wert des Filter-`Pattern` in reinem JSON.

```
{
    "dynamodb": {
        "NewImage": {
            "AccountManager": {
                "S": [ "Pat Candella", "Shirley Rodriguez" ]
            }
        }
    }
}
```

Sie können Ihren Filter mithilfe der Konsole, AWS CLI oder einer AWS SAM-Vorlage hinzufügen.

------
#### [ Console ]

Um diesen Filter mithilfe der Konsole hinzuzufügen, folgen Sie den Anweisungen unter [Anhängen von Filterkriterien an eine Ereignisquellenzuordnung (Konsole)](invocation-eventfiltering.md#filtering-console) und geben Sie die folgende Zeichenfolge für die **Filterkriterien** ein.

```
{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella", "Shirley Rodriguez" ] } } } }
```

------
#### [ AWS CLI ]

Um eine neue Zuordnung von Ereignisquellen mit diesen Filterkriterien über die AWS Command Line Interface (AWS CLI) zu erstellen, führen Sie den folgenden Befehl aus.

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\", \"Shirley Rodriguez\" ] } } } }"}]}'
```

Führen Sie den folgenden Befehl aus, um diese Filterkriterien zu einer vorhandenen Zuordnung von Ereignisquellen hinzuzufügen.

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\", \"Shirley Rodriguez\" ] } } } }"}]}'
```

------
#### [ AWS SAM ]

Um diesen Filter mit der AWS SAM hinzuzufügen, fügen Sie das folgende Snippet zur YAML-Vorlage für Ihre Ereignisquelle hinzu.

```
FilterCriteria:
  Filters:
    - Pattern: '{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella", "Shirley Rodriguez" ] } } } }'
```

------

## Filterung mit booleschen Ausdrücken
<a name="filtering-ddb-boolean"></a>

Sie können Filter auch mithilfe von booleschen UND-Ausdrücken erstellen. Diese Ausdrücke können sowohl die Schlüssel- als auch die Attributparameter Ihrer Tabelle enthalten. Angenommen, Sie möchten Datensätze filtern, bei denen der `NewImage`-Wert von `AccountManager` „Pat Candella“ und der `OldImage`-Wert „Terry Whitlock“ ist. Das `FilterCriteria`-Objekt würde wie folgt aussehen.

```
{
    "Filters": [
        {
            "Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\" ] } } } , \"dynamodb\" : { \"OldImage\" : { \"AccountManager\" : { \"S\" : [ \"Terry Whitlock\" ] } } } }"
        }
    ]
}
```

Zur Verdeutlichung sehen Sie hier den Wert des Filter-`Pattern` in reinem JSON.

```
{ 
    "dynamodb" : { 
        "NewImage" : { 
            "AccountManager" : { 
                "S" : [ 
                    "Pat Candella" 
                ] 
            } 
        } 
    }, 
    "dynamodb": { 
        "OldImage": { 
            "AccountManager": { 
                "S": [ 
                    "Terry Whitlock" 
                ] 
            } 
        } 
    } 
}
```

Sie können Ihren Filter mithilfe der Konsole, AWS CLI oder einer AWS SAM-Vorlage hinzufügen.

------
#### [ Console ]

Um diesen Filter mithilfe der Konsole hinzuzufügen, folgen Sie den Anweisungen unter [Anhängen von Filterkriterien an eine Ereignisquellenzuordnung (Konsole)](invocation-eventfiltering.md#filtering-console) und geben Sie die folgende Zeichenfolge für die **Filterkriterien** ein.

```
{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella" ] } } } , "dynamodb" : { "OldImage" : { "AccountManager" : { "S" : [ "Terry Whitlock" ] } } } }
```

------
#### [ AWS CLI ]

Um eine neue Zuordnung von Ereignisquellen mit diesen Filterkriterien über die AWS Command Line Interface (AWS CLI) zu erstellen, führen Sie den folgenden Befehl aus.

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\" ] } } } , \"dynamodb\" : { \"OldImage\" : { \"AccountManager\" : { \"S\" : [ \"Terry Whitlock\" ] } } } } "}]}'
```

Führen Sie den folgenden Befehl aus, um diese Filterkriterien zu einer vorhandenen Zuordnung von Ereignisquellen hinzuzufügen.

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\" ] } } } , \"dynamodb\" : { \"OldImage\" : { \"AccountManager\" : { \"S\" : [ \"Terry Whitlock\" ] } } } } "}]}'
```

------
#### [ AWS SAM ]

Um diesen Filter mit der AWS SAM hinzuzufügen, fügen Sie das folgende Snippet zur YAML-Vorlage für Ihre Ereignisquelle hinzu.

```
FilterCriteria:
  Filters:
    - Pattern: '{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella" ] } } } , "dynamodb" : { "OldImage" : { "AccountManager" : { "S" : [ "Terry Whitlock" ] } } } }'
```

------

**Anmerkung**  
Die DynamoDB-Ereignisfilterung unterstützt nicht die Verwendung von numerischen Operatoren (numerisch gleich und numerisch Bereich). Auch wenn Elemente in Ihrer Tabelle als Zahlen gespeichert sind, werden diese Parameter im JSON-Datensatzobjekt in Strings umgewandelt.

## Verwenden des Exists-Operators
<a name="filtering-ddb-exists"></a>

Aufgrund der Struktur von JSON-Ereignisobjekten aus DynamoDB ist bei der Verwendung des Operators „Exists“ besondere Vorsicht geboten. Der Exists-Operator funktioniert nur bei Blattknoten im Ereignis-JSON. Wenn Ihr Filtermuster also Exists verwendet, um nach einem Zwischenknoten zu suchen, wird es nicht funktionieren. Betrachten Sie das folgende DynamoDB-Tabellenelement:

```
{
  "UserID": {"S": "12345"},
  "Name": {"S": "John Doe"},
  "Organizations": {"L": [
      {"S":"Sales"},
      {"S":"Marketing"},
      {"S":"Support"}
    ]
  }
}
```

Möglicherweise möchten Sie ein Filtermuster wie das folgende erstellen, das nach Ereignissen sucht, die `"Organizations"` enthalten:

```
{ "dynamodb" : { "NewImage" : { "Organizations" : [ { "exists": true } ] } } }
```

Dieses Filtermuster würde jedoch nie eine Übereinstimmung ergeben, da `"Organizations"` kein Blattknoten ist. Das folgende Beispiel zeigt, wie der Operator „Exists“ richtig verwendet wird, um das gewünschte Filtermuster zu erstellen:

```
{ "dynamodb" : { "NewImage" : {"Organizations": {"L": {"S": [ {"exists": true } ] } } } } }
```

## JSON-Format für DynamoDB-Filterung
<a name="filtering-ddb-JSON-format"></a>

Um Ereignisse aus DynamoDB-Quellen richtig zu filtern, müssen sowohl das Datenfeld als auch Ihre Filterkriterien für das Datenfeld (`dynamodb`) im gültigen JSON-Format vorliegen. Wenn eines der Felder kein gültiges JSON-Format hat, verwirft Lambda die Nachricht oder gibt eine Ausnahme aus. In der folgenden Tabelle ist das Verhalten zusammengefasst: 


| Format der eingehenden Daten | Filtermusterformat für Dateneigenschaften | Resultierende Aktion | 
| --- | --- | --- | 
|  Gültiges JSON  |  Gültiges JSON  |  Lambda filtert basierend auf Ihren Filterkriterien.  | 
|  Gültiges JSON  |  Kein Filtermuster für Dateneigenschaften  |  Lambda filtert (nur für die anderen Metadateneigenschaften) basierend auf Ihren Filterkriterien.  | 
|  Gültiges JSON  |  Kein JSON  |  Lambda gibt zum Zeitpunkt der Erstellung oder Aktualisierung der Ereignisquellenzuordnung eine Ausnahme aus. Das Filtermuster für Dateneigenschaften muss ein gültiges JSON-Format haben.  | 
|  Kein JSON  |  Gültiges JSON  |  Lambda verwirft den Datensatz.  | 
|  Kein JSON  |  Kein Filtermuster für Dateneigenschaften  |  Lambda filtert (nur für die anderen Metadateneigenschaften) basierend auf Ihren Filterkriterien.  | 
|  Kein JSON  |  Kein JSON  |  Lambda gibt zum Zeitpunkt der Erstellung oder Aktualisierung der Ereignisquellenzuordnung eine Ausnahme aus. Das Filtermuster für Dateneigenschaften muss ein gültiges JSON-Format haben.  | 

# Tutorial: Verwendung AWS Lambda mit Amazon DynamoDB DynamoDB-Streams
<a name="with-ddb-example"></a>

 In diesem Tutorial erstellen Sie eine Lambda-Funktion zum Verarbeiten von Ereignissen aus einem Amazon-DynamoDB-Stream.

## Voraussetzungen
<a name="with-ddb-prepare"></a>

### Installieren Sie das AWS Command Line Interface
<a name="install_aws_cli"></a>

Wenn Sie das noch nicht installiert haben AWS Command Line Interface, folgen Sie den Schritten unter [Installieren oder Aktualisieren der neuesten Version von AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html), um es zu installieren.

Das Tutorial erfordert zum Ausführen von Befehlen ein Befehlszeilenterminal oder eine Shell. Verwenden Sie unter Linux und macOS Ihre bevorzugte Shell und Ihren bevorzugten Paketmanager.

**Anmerkung**  
In Windows werden einige Bash-CLI-Befehle, die Sie häufig mit Lambda verwenden (z. B. `zip`), von den integrierten Terminals des Betriebssystems nicht unterstützt. Um eine in Windows integrierte Version von Ubuntu und Bash zu erhalten, [installieren Sie das Windows-Subsystem für Linux](https://docs.microsoft.com/en-us/windows/wsl/install-win10). 

## Erstellen der Ausführungsrolle
<a name="with-ddb-create-execution-role"></a>

Erstellen Sie die [Ausführungsrolle](lambda-intro-execution-role.md), die Ihrer Funktion die Berechtigung zum Zugriff auf AWS Ressourcen erteilt.

**So erstellen Sie eine Ausführungsrolle**

1. Öffnen Sie die Seite [Roles (Rollen)](https://console.aws.amazon.com/iam/home#/roles) in der IAM-Konsole.

1. Wählen Sie **Rolle erstellen** aus.

1. Erstellen Sie eine Rolle mit den folgenden Eigenschaften.
   + **Vertrauenswürdige Entität** – Lambda.
   + **Berechtigungen** — **AWSLambdaDBExecutionDynamo-Rolle**.
   + **Role name (Name der Rolle** – **lambda-dynamodb-role**.

Die **AWSLambdaDBExecutionDynamo-Rolle** verfügt über die Berechtigungen, die die Funktion benötigt, um Elemente aus DynamoDB zu lesen und Protokolle in Logs zu schreiben. CloudWatch 

## Erstellen der Funktion
<a name="with-ddb-example-create-function"></a>

Erstellen Sie eine Lambda-Funktion, die Ihre DynamoDB-Ereignisse verarbeitet. Der Funktionscode schreibt einige der eingehenden Ereignisdaten in Logs. CloudWatch 

------
#### [ .NET ]

**SDK für .NET**  
 Es gibt noch mehr dazu GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Nutzen eines DynamoDB-Ereignisses mit Lambda unter Verwendung von .NET.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace AWSLambda_DDB;

public class Function
{
    public void FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
    {
        context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");

        foreach (var record in dynamoEvent.Records)
        {
            context.Logger.LogInformation($"Event ID: {record.EventID}");
            context.Logger.LogInformation($"Event Name: {record.EventName}");

            context.Logger.LogInformation(JsonSerializer.Serialize(record));
        }

        context.Logger.LogInformation("Stream processing complete.");
    }
}
```

------
#### [ Go ]

**SDK für Go V2**  
 Es gibt noch mehr GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Nutzen eines DynamoDB-Ereignisses mit Lambda unter Verwendung von Go.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-lambda-go/events"
	"fmt"
)

func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*string, error) {
	if len(event.Records) == 0 {
		return nil, fmt.Errorf("received empty event")
	}

	for _, record := range event.Records {
	 	LogDynamoDBRecord(record)
	}

	message := fmt.Sprintf("Records processed: %d", len(event.Records))
	return &message, nil
}

func main() {
	lambda.Start(HandleRequest)
}

func LogDynamoDBRecord(record events.DynamoDBEventRecord){
	fmt.Println(record.EventID)
	fmt.Println(record.EventName)
	fmt.Printf("%+v\n", record.Change)
}
```

------
#### [ Java ]

**SDK für Java 2.x**  
 Es gibt noch mehr GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Nutzen eines DynamoDB-Ereignisses mit Lambda unter Verwendung von Java.  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

public class example implements RequestHandler<DynamodbEvent, Void> {

    private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create();

    @Override
    public Void handleRequest(DynamodbEvent event, Context context) {
        System.out.println(GSON.toJson(event));
        event.getRecords().forEach(this::logDynamoDBRecord);
        return null;
    }

    private void logDynamoDBRecord(DynamodbStreamRecord record) {
        System.out.println(record.getEventID());
        System.out.println(record.getEventName());
        System.out.println("DynamoDB Record: " + GSON.toJson(record.getDynamodb()));
    }
}
```

------
#### [ JavaScript ]

**SDK für JavaScript (v3)**  
 Es gibt noch mehr dazu GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Konsumieren eines DynamoDB-Ereignisses mit Lambda unter Verwendung. JavaScript  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(record => {
        logDynamoDBRecord(record);
    });
};

const logDynamoDBRecord = (record) => {
    console.log(record.eventID);
    console.log(record.eventName);
    console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
```
Konsumieren eines DynamoDB-Ereignisses mit Lambda unter Verwendung. TypeScript  

```
export const handler = async (event, context) => {
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(record => {
        logDynamoDBRecord(record);
    });
}
const logDynamoDBRecord = (record) => {
    console.log(record.eventID);
    console.log(record.eventName);
    console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
```

------
#### [ PHP ]

**SDK für PHP**  
 Es gibt noch mehr dazu. GitHub Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Nutzen eines DynamoDB-Ereignisses mit Lambda unter Verwendung von PHP.  

```
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\DynamoDb\DynamoDbHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler extends DynamoDbHandler
{
    private StderrLogger $logger;

    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handleDynamoDb(DynamoDbEvent $event, Context $context): void
    {
        $this->logger->info("Processing DynamoDb table items");
        $records = $event->getRecords();

        foreach ($records as $record) {
            $eventName = $record->getEventName();
            $keys = $record->getKeys();
            $old = $record->getOldImage();
            $new = $record->getNewImage();
            
            $this->logger->info("Event Name:".$eventName."\n");
            $this->logger->info("Keys:". json_encode($keys)."\n");
            $this->logger->info("Old Image:". json_encode($old)."\n");
            $this->logger->info("New Image:". json_encode($new));
            
            // TODO: Do interesting work based on the new data

            // Any exception thrown will be logged and the invocation will be marked as failed
        }

        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords items");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK für Python (Boto3)**  
 Es gibt noch mehr GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Nutzen eines DynamoDB-Ereignisses mit Lambda unter Verwendung von Python.  

```
import json

def lambda_handler(event, context):
    print(json.dumps(event, indent=2))

    for record in event['Records']:
        log_dynamodb_record(record)

def log_dynamodb_record(record):
    print(record['eventID'])
    print(record['eventName'])
    print(f"DynamoDB Record: {json.dumps(record['dynamodb'])}")
```

------
#### [ Ruby ]

**SDK für Ruby**  
 Es gibt noch mehr GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Nutzen eines DynamoDB-Ereignisses mit Lambda unter Verwendung von Ruby.  

```
def lambda_handler(event:, context:)
    return 'received empty event' if event['Records'].empty?
  
    event['Records'].each do |record|
      log_dynamodb_record(record)
    end
  
    "Records processed: #{event['Records'].length}"
  end
  
  def log_dynamodb_record(record)
    puts record['eventID']
    puts record['eventName']
    puts "DynamoDB Record: #{JSON.generate(record['dynamodb'])}"
  end
```

------
#### [ Rust ]

**SDK für Rust**  
 Es gibt noch mehr dazu GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda). 
Nutzen eines DynamoDB-Ereignisses mit Lambda unter Verwendung von Rust.  

```
use lambda_runtime::{service_fn, tracing, Error, LambdaEvent};
use aws_lambda_events::{
    event::dynamodb::{Event, EventRecord},
   };


// Built with the following dependencies:
//lambda_runtime = "0.11.1"
//serde_json = "1.0"
//tokio = { version = "1", features = ["macros"] }
//tracing = { version = "0.1", features = ["log"] }
//tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }
//aws_lambda_events = "0.15.0"

async fn function_handler(event: LambdaEvent<Event>) ->Result<(), Error> {
    
    let records = &event.payload.records;
    tracing::info!("event payload: {:?}",records);
    if records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(());
    }

    for record in records{
        log_dynamo_dbrecord(record);
    }

    tracing::info!("Dynamo db records processed");

    // Prepare the response
    Ok(())

}

fn log_dynamo_dbrecord(record: &EventRecord)-> Result<(), Error>{
    tracing::info!("EventId: {}", record.event_id);
    tracing::info!("EventName: {}", record.event_name);
    tracing::info!("DynamoDB Record: {:?}", record.change );
    Ok(())

}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
    .with_max_level(tracing::Level::INFO)
    .with_target(false)
    .without_time()
    .init();

    let func = service_fn(function_handler);
    lambda_runtime::run(func).await?;
    Ok(())
    
}
```

------

**So erstellen Sie die Funktion**

1. Kopieren Sie den Beispiel-Code in eine Datei mit dem Namen `example.js`.

1. Erstellen Sie ein Bereitstellungspaket.

   ```
   zip function.zip example.js
   ```

1. Erstellen Sie eine Lambda-Funktion mit dem Befehl `create-function`.

   ```
   aws lambda create-function --function-name ProcessDynamoDBRecords \
       --zip-file fileb://function.zip --handler example.handler --runtime nodejs24.x \
       --role arn:aws:iam::111122223333:role/lambda-dynamodb-role
   ```

## Lambda-Funktion testen
<a name="with-dbb-invoke-manually"></a>

In diesem Schritt rufen Sie Ihre Lambda-Funktion manuell mit dem `invoke` AWS Lambda CLI-Befehl und dem folgenden DynamoDB-Beispielereignis auf. Kopieren Sie den folgenden Code in eine Datei mit dem Namen `input.txt`.

**Example input.txt**  

```
{
   "Records":[
      {
         "eventID":"1",
         "eventName":"INSERT",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"111",
            "SizeBytes":26,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"2",
         "eventName":"MODIFY",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"222",
            "SizeBytes":59,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"3",
         "eventName":"REMOVE",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"333",
            "SizeBytes":38,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      }
   ]
}
```

Führen Sie den Befehl `invoke` aus. 

```
aws lambda invoke --function-name ProcessDynamoDBRecords \
    --cli-binary-format raw-in-base64-out \
    --payload file://input.txt outputfile.txt
```

Die **cli-binary-format** Option ist erforderlich, wenn Sie Version 2 verwenden. AWS CLI Um dies zur Standardeinstellung zu machen, führen Sie `aws configure set cli-binary-format raw-in-base64-out` aus. Weitere Informationen finden Sie unter [Von AWS CLI unterstützte globale Befehlszeilenoptionen](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-options.html#cli-configure-options-list) im *AWS Command Line Interface -Benutzerhandbuch für Version 2*.

Die Funktion gibt die Zeichenfolge `message` im Antworttext zurück. 

Überprüfen Sie die Ausgabe in der Datei `outputfile.txt`.

## Erstellen einer DynamoDB-Tabelle mit einem aktivierten Stream
<a name="with-ddb-create-buckets"></a>

Erstellen einer Amazon-DynamoDB-Tabelle mit einem aktivierten Stream.

**So erstellen Sie eine DynamoDB-Tabelle**

1. Öffnen Sie die [DynamoDB-Konsole](https://console.aws.amazon.com/dynamodb).

1. Wählen Sie **Create table** aus.

1. Erstellen Sie eine Tabelle mit den folgenden Einstellungen:
   + **Tabellenname** – **lambda-dynamodb-stream**
   + **Primärschlüssel** – **id** (Zeichenfolge)

1. Wählen Sie **Erstellen** aus.

**So aktivieren Sie Streams**

1. Öffnen Sie die [DynamoDB-Konsole](https://console.aws.amazon.com/dynamodb).

1. Wählen Sie **Tables (Tabellen)** aus.

1. Wählen Sie die **lambda-dynamodb-stream** Tabelle aus. 

1. Wählen Sie unter **Exports and streams** (Exporte und Streams) die Option **DynamoDB stream details** (Details zu DynamoDB-Streams) aus.

1. Wählen Sie **Turn on** (Einschalten) aus.

1. Wählen Sie als **Ansichtstyp** die Option **Nur Schlüsselattribute** aus.

1. Wählen Sie **Stream einschalten**.

Notieren Sie sich den Stream-ARN. Sie benötigen diesen im nächsten Schritt, um die Lambda-Funktion dem Stream zuzuordnen. Weitere Informationen zur Aktivierung von Streams finden Sie unter [Erfassen von Tabellenaktivitäten mit DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html).

## Fügen Sie eine Ereignisquelle hinzu in AWS Lambda
<a name="with-ddb-attach-notification-configuration"></a>

Erstellen Sie eine Ereignisquellenzuordnung in AWS Lambda. Diese Ereignisquellenzuordnung ordnet den DynamoDB-Stream Ihrer Lambda-Funktion zu. Nachdem Sie diese Ereignisquellenzuordnung erstellt haben, AWS Lambda beginnt die Abfrage des Streams.

Führen Sie den Befehl AWS CLI `create-event-source-mapping` aus. Notieren Sie sich die UUID, nachdem Sie den Befehl ausgeführt haben. Sie benötigen diese UUID, um in Befehlen auf die Ereignisquellenzuordnung zu verweisen (beispielsweise beim Löschen der Ereignisquellen-Zuweisung).

```
aws lambda create-event-source-mapping --function-name ProcessDynamoDBRecords \
    --batch-size 100 --starting-position LATEST --event-source DynamoDB-stream-arn
```

 Dadurch wird eine Zuweisung zwischen dem angegebenen DynamoDB-Stream und der Lambda-Funktion erstellt. Sie können einen DynamoDB-Stream mehreren Lambda-Funktionen und dieselbe Lambda-Funktion mehreren Streams zuordnen. Die Lambda-Funktionen teilen jedoch den Lesedurchsatz für den geteilten Stream. 

Sie erhalten die Liste der Ereignisquellen-Zuweisungen, indem Sie folgenden Befehl ausführen.

```
aws lambda list-event-source-mappings
```

Die Liste gibt alle von Ihnen erstellten Ereignisquellenzuordnungen zurück und für jede Zuordnung zeigt sie u. a. den `LastProcessingResult`. Dieses Feld wird verwendet, um eine informative Meldung bereitzustellen, wenn Probleme auftreten. Werte wie `No records processed` (gibt an, dass die Abfrage nicht gestartet AWS Lambda wurde oder dass der Stream keine Datensätze enthält) und `OK` (gibt an, dass Datensätze AWS Lambda erfolgreich aus dem Stream gelesen und Ihre Lambda-Funktion aufgerufen wurden) zeigen an, dass keine Probleme vorliegen. Bei Problemen erhalten Sie eine Fehlermeldung.

Bei einer großen Anzahl von Ereignisquellen-Zuweisungen verwenden Sie den Funktionsnamen-Parameter, um die Ergebnisse einzugrenzen.

```
aws lambda list-event-source-mappings --function-name ProcessDynamoDBRecords
```

## Testen der Einrichtung
<a name="with-ddb-final-integration-test-no-iam"></a>

Testen Sie das Erlebnis. end-to-end Beim Aktualisieren der Tabelle schreibt DynamoDB Ereignisdatensätze in den Stream. Da AWS Lambda den Stream abfragt, erkennt es neue Datensätze im Stream und ruft Ihre Lambda-Funktion für Sie auf, indem Ereignisse an die Funktion übergeben werden. 

1. Fügen Sie in der DynamoDB-Konsole Elemente hinzu, aktualisieren und löschen Sie diese und fügen Sie diese hinzu. DynamoDB schreibt Datensätze dieser Aktionen in den Stream.

1. AWS Lambda fragt den Stream ab und wenn es Aktualisierungen am Stream erkennt, ruft es Ihre Lambda-Funktion auf, indem es die im Stream gefundenen Ereignisdaten weitergibt.

1. Ihre Funktion wird in Amazon ausgeführt und erstellt Protokolle CloudWatch. Sie können die in der CloudWatch Amazon-Konsole gemeldeten Protokolle überprüfen.

## Nächste Schritte
<a name="with-ddb-next-steps"></a>

In diesem Tutorial haben Sie die Grundlagen der Verarbeitung von DynamoDB-Stream-Ereignissen mit Lambda kennengelernt. Bei Produktions-Workloads sollten Sie erwägen, eine Logik für teilweise Batch-Antworten zu implementieren, um einzelne Datensatzfehler effizienter zu handhaben. Das [Batchverarbeitungsprogramm](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/) von Powertools for AWS Lambda ist in Python TypeScript, .NET und Java verfügbar und bietet hierfür eine robuste Lösung, die die Komplexität von partiellen Batch-Antworten automatisch handhabt und die Anzahl der Wiederholungen für erfolgreich verarbeitete Datensätze reduziert.

## Bereinigen Ihrer Ressourcen
<a name="cleanup"></a>

Sie können jetzt die Ressourcen, die Sie für dieses Tutorial erstellt haben, löschen, es sei denn, Sie möchten sie behalten. Durch das Löschen von AWS Ressourcen, die Sie nicht mehr verwenden, vermeiden Sie unnötige Kosten für Ihre. AWS-Konto

**So löschen Sie die Lambda-Funktion:**

1. Öffnen Sie die Seite [Funktionen](https://console.aws.amazon.com/lambda/home#/functions) der Lambda-Konsole.

1. Wählen Sie die Funktion aus, die Sie erstellt haben.

1. Wählen Sie **Aktionen**, **Löschen** aus.

1. Geben Sie **confirm** in das Texteingabefeld ein und wählen Sie **Delete** (Löschen) aus.

**So löschen Sie die Ausführungsrolle**

1. Öffnen Sie die Seite [Roles](https://console.aws.amazon.com/iam/home#/roles) in der IAM-Konsole.

1. Wählen Sie die von Ihnen erstellte Ausführungsrolle aus.

1. Wählen Sie **Löschen** aus.

1. Geben Sie den Namen der Rolle in das Texteingabefeld ein und wählen Sie **Delete** (Löschen) aus.

**So löschen Sie die DynamoDB-Tabelle**

1. Öffnen Sie die Seite [Tables (Tabellen)](https://console.aws.amazon.com//dynamodb/home#tables:) in der DynamoDB-Konsole.

1. Wählen Sie die von Ihnen erstellte Tabelle aus.

1. Wählen Sie **Löschen** aus.

1. Geben Sie **delete** in das Textfeld ein.

1. Wählen Sie **Delete Table** (Tabelle löschen).