

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Verarbeiten von Datensätzen aus Amazon Kinesis Data Streams mit Lambda
<a name="with-kinesis"></a>

Sie können eine Lambda-Funktion verwenden, um Datensätze in einem [Amazon Kinesis-Datenstrom](https://docs.aws.amazon.com/streams/latest/dev/introduction.html) zu verarbeiten. Sie können eine Lambda-Funktion zu einem Kinesis Data Streams Konsumenten mit gemeinsam genutztem Durchsatz (Standard-Iterator) oder zu einem Konsumenten mit dediziertem Durchsatz mit [erweitertem Rundsenden](https://docs.aws.amazon.com/kinesis/latest/dev/enhanced-consumers.html) zuweisen. Bei Standard-Iteratoren fragt Lambda jeden Shard in Ihrem Kinesis-Stream nach Datensätzen ab, die das HTTP-Protokoll verwenden. Die Ereignisquellenzuordnung teilt den Lesedurchsatz mit anderen Konsumenten des Shards zusammen.

 Weitere Informationen zu Kinesis-Datenströmen finden Sie unter [Daten aus Amazon Kinesis Data Streams](https://docs.aws.amazon.com/kinesis/latest/dev/building-consumers.html).

**Anmerkung**  
Kinesis berechnet Gebühren für jeden Shard, sowie bei verbessertem Rundsenden für Daten, die aus dem Stream gelesen werden. Details zu den Preisen finden Sie unter [Amazon-Kinesis- Preise](https://aws.amazon.com/kinesis/data-streams/pricing).

## Abfragen und Stapeln von Streams
<a name="kinesis-polling-and-batching"></a>

Lambda liest Datensätze aus dem Datenstrom und ruft Ihre Funktion [synchron](invocation-sync.md) mit einem Ereignis auf, das Stream-Datensätze enthält. Lambda liest Datensätze in Batches und ruft Ihre Funktion auf, um Datensätze aus dem Batch zu verarbeiten. Jeder Batch enthält Datensätze aus einem einzelnen Shard/Datenstrom.

Ihre Lambda-Funktion ist eine Konsumentenanwendung für Ihren Daten-Stream. Sie verarbeitet jeweils einen Batch Datensätzen aus jedem Shard. Sie können eine Lambda-Funktion zu einem Konsumenten mit gemeinsam genutztem Durchsatz (Standard-Iterator) oder zu einem Konsumenten mit dediziertem Durchsatz mit erweitertem Rundsenden zuweisen.
+ **Standard-Iterator:** Lambda fragt jeden Shard in Ihrem Kinesis-Stream mit einer Basisrate von einmal pro Sekunde nach Datensätzen ab. Wenn mehr Datensätze verfügbar sind, verarbeitet Lambda Batches, bis die Funktion mit dem Stream gleichzieht. Die Ereignisquellenzuordnung teilt den Lesedurchsatz mit anderen Konsumenten des Shards zusammen.
+ **Erweitertes Rundsenden:** Um die Latenz zu minimieren und den Lesedurchsatz zu maximieren, erstellen Sie einen Daten-Stream-Konsumenten mit [erweitertem Rundsenden](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html). Stream-Konsumenten mit erweitertem Rundsenden erhalten eine dedizierte Verbindung für jeden Shard, der keine Auswirkungen auf andere Anwendungen hat, die aus dem Stream lesen. Stream-Konsumenten verwenden HTTP/2, um die Latenz zu reduzieren, indem Datensätze über eine langlebige Verbindung an Lambda übertragen und Anforderungs-Header komprimiert werden. Sie können einen Stream-Konsumenten mit der Kinesis-[RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)-API erstellen.

```
aws kinesis register-stream-consumer \
--consumer-name con1 \
--stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream
```

Die Ausgabe sollte folgendermaßen aussehen:

```
{
    "Consumer": {
        "ConsumerName": "con1",
        "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608",
        "ConsumerStatus": "CREATING",
        "ConsumerCreationTimestamp": 1540591608.0
    }
}
```

Um die Geschwindigkeit zu erhöhen, mit der Ihre Funktion Datensätze verarbeitet, [fügen Sie Ihrem Datenstrom Shards hinzu](https://repost.aws/knowledge-center/kinesis-data-streams-open-shards). Lambda verarbeitet Datensätze in jedem Shard in der Reihenfolge. Es beendet die Verarbeitung zusätzlicher Datensätze in einem Shard, wenn Ihre Funktion einen Fehler zurückgibt. Mehr Shards bedeutet, dass mehr Stapel verarbeitet und gleichzeitig die Auswirkungen von Fehlern auf die Nebenläufigkeit verringert werden.

Wenn Ihre Funktion nicht hochskalieren kann, um alle gleichzeitigen Stapel zu verarbeiten, [fordern Sie eine Kontingenterhöhung an](https://docs.aws.amazon.com/servicequotas/latest/userguide/request-quota-increase.html) oder [reservieren Sie Gleichzeitigkeit](configuration-concurrency.md) für Ihre Funktion.

Standardmäßig ruft Lambda Ihre Funktion auf, sobald Datensätze verfügbar sind. Wenn der Batch, den Lambda aus der Ereignisquelle liest, nur einen Datensatz enthält, sendet Lambda nur einen Datensatz an die Funktion. Damit die Funktion nicht mit einer kleinen Anzahl von Datensätzen aufgerufen wird, können Sie die Ereignisquelle anweisen, Datensätze bis zu 5 Minuten lang zu puffern, indem Sie ein *Batch-Fenster* konfigurieren. Bevor die Funktion aufgerufen wird, liest Lambda so lange Datensätze aus der Ereignisquelle, bis es einen vollständigen Batch erfasst hat, das Batch-Verarbeitungsfenster abläuft oder der Batch die Nutzlastgrenze von 6 MB erreicht. Weitere Informationen finden Sie unter [Batching-Verhalten](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching).

**Warnung**  
Zuordnung von Lambda-Ereignisquellen verarbeiten jedes Ereignis mindestens einmal und es kann zu einer doppelten Verarbeitung von Datensätzen kommen. Um mögliche Probleme im Zusammenhang mit doppelten Ereignissen zu vermeiden, empfehlen wir Ihnen dringend, Ihren Funktionscode idempotent zu machen. Weitere Informationen finden Sie unter [Wie mache ich meine Lambda-Funktion idempotent](https://repost.aws/knowledge-center/lambda-function-idempotent) im AWS-Wissenszentrum.

Lambda wartet mit dem Senden des nächsten zu verarbeitenden Stapels nicht, bis ggf. konfigurierte [Erweiterungen](lambda-extensions.md) abgeschlossen sind. Anders ausgedrückt: Ihre Erweiterungen werden möglicherweise weiter ausgeführt, während Lambda den nächsten Stapel von Datensätzen verarbeitet. Dies kann zu Drosselungsproblemen führen, wenn Sie gegen eine Einstellung oder gegen einen Grenzwert im Zusammenhang mit der [Parallelität](lambda-concurrency.md) Ihres Kontos verstoßen. Um zu erkennen, ob möglicherweise ein Problem vorliegt, müssen Sie Ihre Funktionen überwachen sowie überprüfen, ob für Ihre Zuordnung von Ereignisquellen unerwartet hohe [Parallelitätsmetriken](monitoring-concurrency.md#general-concurrency-metrics) vorliegen. Aufgrund der kurzen Zeit zwischen den Aufrufen kann Lambda kurzzeitig eine höhere Gleichzeitigkeitsnutzung als die Anzahl der Shards melden. Dies kann sogar für Lambda-Funktionen ohne Erweiterungen gelten.

Konfigurieren Sie die [ParallelizationFactor](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-ParallelizationFactor)-Einstellung, um einen Shard eines Kinesis-Datenstroms mit mehr als einem Lambda-Aufruf gleichzeitig zu verarbeiten. Sie können die Anzahl der gleichzeitigen Batches angeben, die Lambda von einem Shard über einen Parallelisierungsfaktor von 1 (Standard) bis 10 abfragt. Wenn `ParallelizationFactor` beispielsweise auf 2 gesetzt ist, können Sie maximal 200 gleichzeitige Lambda-Aufrufe haben, um 100 Kinesis-Daten-Shards zu verarbeiten (in der Praxis werden womöglich andere Werte für die Metrik `ConcurrentExecutions` angezeigt). Dies hilft, den Verarbeitungsdurchsatz hochzuskalieren, wenn das Datenvolumen flüchtig ist und `IteratorAge` hoch ist. Wenn Sie die Anzahl gleichzeitiger Batches pro Shard erhöhen, stellt Lambda weiterhin die Auftragsverarbeitung auf Partitionsschlüsselebene sicher.

Sie können `ParallelizationFactor` auch mit der Kinesis-Aggregation verwenden. Das Verhalten der Zuordnung von Ereignisquellen hängt davon ab, ob Sie das [erweiterte Rundsenden](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html) verwenden:
+ **Ohne erweitertes Rundsenden**: Alle Ereignisse innerhalb eines aggregierten Ereignisses müssen denselben Partitionsschlüssel haben. Der Partitionsschlüssel muss außerdem mit dem des aggregierten Ereignisses übereinstimmen. Wenn die Ereignisse innerhalb des aggregierten Ereignisses unterschiedliche Partitionsschlüssel haben, kann Lambda nicht garantieren, dass die Ereignisse in der richtigen Reihenfolge nach Partitionsschlüssel verarbeitet werden.
+ **Mit verbessertem Rundsenden**: Zunächst dekodiert Lambda das aggregierte Ereignis in seine einzelnen Ereignisse. Das aggregierte Ereignis kann einen anderen Partitionsschlüssel haben als die darin enthaltenen Ereignisse. Ereignisse, die nicht dem Partitionsschlüssel entsprechen, werden jedoch [gelöscht und gehen verloren](https://github.com/awslabs/kinesis-aggregation/blob/master/potential_data_loss.md). Lambda verarbeitet diese Ereignisse nicht und sendet sie nicht an ein konfiguriertes Fehlerziel.

## Beispielereignis
<a name="services-kinesis-event-example"></a>

**Example**  

```
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
                "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
                "approximateArrivalTimestamp": 1545084711.166
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        }
    ]
}
```

# Verarbeiten von Amazon Kinesis Data Streams-Datensätzen mit Lambda
<a name="services-kinesis-create"></a>

Um Amazon-Kinesis-Data-Streams-Datensätze mit Lambda zu verarbeiten, erstellen Sie eine Lambda-Zuordnung von Ereignisquellen. Sie können eine Lambda-Funktion zu einem Standard-Iterator oder zu einem Konsumenten für das erweiterte Rundsenden zuweisen. Weitere Informationen finden Sie unter [Abfragen und Stapeln von Streams](with-kinesis.md#kinesis-polling-and-batching).

## Erstellen einer Zuordnung von Ereignisquellen für Kinesis
<a name="services-kinesis-eventsourcemapping"></a>

Um Ihre Lambda-Funktion mit Datensätzen aus Ihrem Datenstrom aufzurufen, erstellen Sie eine [Zuordnung von Ereignisquellen](invocation-eventsourcemapping.md). Sie können mehrere Ereignisquellenzuordnungen erstellen, um gleiche Daten mit mehreren Lambda-Funktionen oder Elemente aus mehreren Daten-Streams mit nur einer Funktion zu verarbeiten. Bei der Verarbeitung von Elementen aus mehreren Datenströmen enthält jeder Batch Datensätze aus nur einem einzigen Shard oder Stream.

Sie können Zuordnungen von Ereignisquellen konfigurieren, um Datensätze aus einem Stream in einem anderen AWS-Konto zu verarbeiten. Weitere Informationen hierzu finden Sie unter [Erstellen einer kontoübergreifenden Zuordnung von Ereignisquellen](#services-kinesis-eventsourcemapping-cross-account).

Bevor Sie ein Zuordnung von Ereignisquellen erstellen, müssen Sie Ihrer Lambda-Funktion die Berechtigung zum Lesen aus einem Kinesis-Datenstrom erteilen. Lambda benötigt die folgenden Berechtigungen zum Verwalten von Ressourcen, die zu Ihrem Kinesis-Datenstrom gehören:
+ [Kinese: DescribeStream](https://docs.aws.amazon.com/lambda/latest/api/API_DescribeStream.html)
+ [Kinese: DescribeStreamSummary](https://docs.aws.amazon.com/lambda/latest/api/API_DescribeStreamSummary.html)
+ [Kinese: GetRecords](https://docs.aws.amazon.com/lambda/latest/api/API_GetRecords.html)
+ [Kinese: GetShardIterator](https://docs.aws.amazon.com/lambda/latest/api/API_GetShardIterator.html)
+ [Kinese: ListShards](https://docs.aws.amazon.com/lambda/latest/api/API_ListShards.html)
+ [Kinese: SubscribeToShard](https://docs.aws.amazon.com/lambda/latest/api/API_SubscribeToShard.html)

Die AWS verwaltete Richtlinie [AWSLambdaKinesisExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaKinesisExecutionRole.html)umfasst diese Berechtigungen. Fügen Sie diese verwaltete Richtlinie zu Ihrer Funktion hinzu, wie im folgenden Verfahren beschrieben.

**Anmerkung**  
Sie benötigen keine `kinesis:ListStreams`-Berechtigung, um Zuordnungen von Ereignisquellen für Kinesis zu erstellen und zu verwalten. Wenn Sie jedoch eine Zuordnung von Ereignisquellen in der Konsole erstellen und nicht über diese Berechtigung verfügen, können Sie keinen Kinesis-Stream aus einer Dropdown-Liste auswählen und die Konsole zeigt einen Fehler an. Um die Zuordnung von Ereignisquellen zu erstellen, müssen Sie den Amazon-Ressourcennamen (ARN) Ihres Streams manuell eingeben.
Lambda ruft die APIs `kinesis:GetRecords` und `kinesis:GetShardIterator` auf, wenn fehlgeschlagene Aufrufe erneut versucht werden.

------
#### [ AWS-Managementkonsole ]

**So fügen Sie Kinesis-Berechtigungen zu Ihrer Funktion hinzu**

1. Öffnen Sie die [Funktionsseite](https://console.aws.amazon.com/lambda/home#/functions) der Lambda-Konsole und wählen Sie Ihre Funktion aus.

1. Klicken Sie in der Registerkarte **Konfiguration** auf die Option **Berechtigungen**.

1. Wählen Sie im Bereich **Ausführungsrolle** unter **Rollenname** den Link zur Ausführungsrolle Ihrer Funktion aus. Dieser Link öffnet die Seite für diese Rolle in der IAM-Konsole.

1. Unter **Berechtigungsrichtlinien** im Abschnitt **Berechtigungen hinzufügen** wählen Sie dann **Richtlinien anfügen** aus.

1. Geben Sie im Suchfeld **AWSLambdaKinesisExecutionRole** ein.

1. Aktivieren Sie das Kontrollkästchen neben der Richtlinie und wählen Sie **Berechtigung hinzufügen**.

------
#### [ AWS CLI ]

**So fügen Sie Kinesis-Berechtigungen zu Ihrer Funktion hinzu**
+ Führen Sie den folgenden CLI-Befehl aus, um die `AWSLambdaKinesisExecutionRole`-Richtlinie zur Ausführungsrolle Ihrer Funktion hinzuzufügen:

  ```
  aws iam attach-role-policy \
  --role-name MyFunctionRole \
  --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
  ```

------
#### [ AWS SAM ]

**So fügen Sie Kinesis-Berechtigungen zu Ihrer Funktion hinzu**
+ Fügen Sie die `Policies`-Eigenschaft wie im folgenden Beispiel gezeigt in der Definition Ihrer Funktion hinzu:

  ```
  Resources:
    MyFunction:
      Type: AWS::Serverless::Function
      Properties:
        CodeUri: ./my-function/
        Handler: index.handler
        Runtime: nodejs24.x
        Policies:
          - AWSLambdaKinesisExecutionRole
  ```

------

Nachdem Sie die erforderlichen Berechtigungen konfiguriert haben, erstellen Sie die Zuordnung von Ereignisquellen.

------
#### [ AWS-Managementkonsole ]

**So erstellen Sie die Kinesis-Zuordnung von Ereignisquellen**

1. Öffnen Sie die [Funktionsseite](https://console.aws.amazon.com/lambda/home#/functions) der Lambda-Konsole und wählen Sie Ihre Funktion aus.

1. Wählen Sie im Bereich **Function overview (Funktionsübersicht)** die Option **Add trigger (Auslöser hinzufügen)**.

1. Wählen Sie unter **Trigger-Konfiguration** als Quelle die Option **Kinesis** aus.

1. Wählen Sie den Kinesis-Stream, für den Sie die Zuordnung von Ereignisquellen erstellen möchten und optional einen Konsumenten Ihres Streams.

1. (Optional) Bearbeiten Sie die **Batch-Größe**, die **Startposition** und das **Batch-Fenster** für Ihre Zuordnung von Ereignisquellen.

1. Wählen Sie **Hinzufügen** aus.

Wenn Sie Ihre Ereignisquellenzuordnung von der Konsole aus erstellen, muss Ihre IAM-Rolle über die Berechtigungen [kinesis: ListStreams und [kinesis](https://docs.aws.amazon.com/lambda/latest/api/API_ListStreamConsumers.html):](https://docs.aws.amazon.com/lambda/latest/api/API_ListStreams.html) verfügen. ListStreamConsumers

------
#### [ AWS CLI ]

**So erstellen Sie die Kinesis-Zuordnung von Ereignisquellen**
+ Führen Sie den folgenden CLI-Befehl aus, um eine Kinesis-Zuordnung von Ereignisquellen zu erstellen. Wählen Sie Ihre eigene Batch-Größe und Startposition entsprechend Ihrem Anwendungsfall.

  ```
  aws lambda create-event-source-mapping \
  --function-name MyFunction \
  --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \
  --starting-position LATEST \
  --batch-size 100
  ```

Um ein Batching-Fenster anzugeben, fügen Sie die `--maximum-batching-window-in-seconds`-Option hinzu. *Weitere Informationen zur Verwendung dieses und anderer Parameter finden Sie [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html)in der AWS CLI Befehlsreferenz.*

------
#### [ AWS SAM ]

**So erstellen Sie die Kinesis-Zuordnung von Ereignisquellen**
+ Fügen Sie die `KinesisEvent`-Eigenschaft wie im folgenden Beispiel gezeigt in der Definition Ihrer Funktion hinzu:

  ```
  Resources:
    MyFunction:
      Type: AWS::Serverless::Function
      Properties:
        CodeUri: ./my-function/
        Handler: index.handler
        Runtime: nodejs24.x
        Policies:
          - AWSLambdaKinesisExecutionRole
        Events:
          KinesisEvent:
            Type: Kinesis
            Properties:
              Stream: !GetAtt MyKinesisStream.Arn
              StartingPosition: LATEST
              BatchSize: 100
  
    MyKinesisStream:
      Type: AWS::Kinesis::Stream
      Properties:
        ShardCount: 1
  ```

Weitere Informationen zum Erstellen einer Ereignisquellenzuordnung für Kinesis Data Streams finden Sie unter [Kinesis](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-property-function-kinesis.html) im *AWS Serverless Application Model Entwicklerhandbuch*. AWS SAM

------

## Abfrage und Startposition des Streams
<a name="services-kinesis-stream-start-pos"></a>

Beachten Sie, dass die Stream-Abfrage bei der Erstellung und Aktualisierung der Zuordnung von Ereignisquellen letztendlich konsistent ist.
+ Bei der Erstellung der Zuordnung von Ereignisquellen kann es mehrere Minuten dauern, bis mit der Abfrage von Ereignissen aus dem Stream begonnen wird.
+ Bei Aktualisierungen der Zuordnung von Ereignisquellen kann es mehrere Minuten dauern, bis die Abfrage von Ereignissen aus dem Stream gestoppt und neu gestartet wird.

Dieses Verhalten bedeutet, dass, wenn Sie `LATEST` als Startposition für den Stream angeben, die Zuordnung von Ereignisquellen bei der Erstellung oder Aktualisierung möglicherweise Ereignisse übersieht. Um sicherzustellen, dass keine Ereignisse übersehen werden, geben Sie die Startposition des Streams als `TRIM_HORIZON` oder `AT_TIMESTAMP` an.

## Erstellen einer kontoübergreifenden Zuordnung von Ereignisquellen
<a name="services-kinesis-eventsourcemapping-cross-account"></a>

Amazon Kinesis Data Streams unterstützt [ressourcenbasierte Richtlinien](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_identity-vs-resource.html). Aus diesem Grund können Sie in einen Stream aufgenommene Daten in einem Konto AWS-Konto mit einer Lambda-Funktion in einem anderen Konto verarbeiten.

Um eine Ereignisquellenzuordnung für Ihre Lambda-Funktion mithilfe eines Kinesis-Streams in einem anderen zu erstellen AWS-Konto, müssen Sie den Stream mithilfe einer ressourcenbasierten Richtlinie konfigurieren, um Ihrer Lambda-Funktion die Berechtigung zum Lesen von Elementen zu erteilen. Informationen dazu, wie Sie Ihren Stream so konfigurieren, dass er kontoübergreifenden Zugriff ermöglicht, finden Sie unter [Zugriff mit kontoübergreifenden AWS Lambda Funktionen teilen](https://docs.aws.amazon.com/streams/latest/dev/resource-based-policy-examples.html#Resource-based-policy-examples-lambda) im *Amazon Kinesis Streams Streams-Entwicklerhandbuch*.

Sobald Sie Ihren Stream mit einer ressourcenbasierten Richtlinie konfiguriert haben, die Ihrer Lambda-Funktion die erforderlichen Berechtigungen erteilt, erstellen Sie die Zuordnung von Ereignisquellen mit einer der im vorherigen Abschnitt beschriebenen Methoden.

Wenn Sie Ihre Zuordnung von Ereignisquellen über die Lambda-Konsole erstellen möchten, fügen Sie den ARN Ihres Streams direkt in das Eingabefeld ein. Wenn Sie einen Verbraucher für Ihren Stream angeben möchten, wird durch Einfügen der ARN des Verbrauchers automatisch das Stream-Feld ausgefüllt.

# Konfigurieren einer teilweisen Batch-Antwort mit Kinesis Data Streams und Lambda
<a name="services-kinesis-batchfailurereporting"></a>

Beim Konsumieren und Verarbeiten von Streaming-Daten aus einer Ereignisquelle werden standardmäßig Lambda-Checkpoints auf die höchste Sequenznummer eines Batches nur dann überprüft, wenn der Batch ein voller Erfolg ist. Lambda behandelt alle anderen Ergebnisse als einen vollständigen Fehler und versucht, den Batch bis zum Wiederholungslimit zu verarbeiten. Um beim Verarbeiten von Stapeln aus einem Stream Teilerfolge zu ermöglichen, aktivieren Sie `ReportBatchItemFailures`. Das Zulassen von Teilerfolgen kann dazu beitragen, die Anzahl der Wiederholungen in einer Aufzeichnung zu reduzieren, obwohl die Möglichkeit von Wiederholungen in einer erfolgreichen Aufzeichnung nicht vollständig verhindert wird.

Zum Aktivieren von `ReportBatchItemFailures` fügen Sie den Enum-Wert **ReportBatchItemFailures** der [FunctionResponseTypes](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionResponseTypes)-Liste hinzu. Diese Liste zeigt an, welche Antworttypen für Ihre Funktion aktiviert sind. Sie können diese Liste konfigurieren, wenn Sie eine Zuordnung von Ereignisquellen [erstellen](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) oder [aktualisieren](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html).

**Anmerkung**  
Selbst wenn Ihr Funktionscode teilweise Antworten bei Batch-Fehlern zurückgibt, werden diese Antworten nicht von Lambda verarbeitet, es sei denn, das `ReportBatchItemFailures`-Feature ist explizit für Ihre Zuordnung von Ereignisquellen aktiviert.

## Berichtsyntax
<a name="streams-batchfailurereporting-syntax"></a>

Beim Konfigurieren von Berichten zu Batch-Elementfehlern wird die `StreamsEventResponse`-Klasse mit einer Liste von Batch-Elementfehlern zurückgegeben. Sie können ein `StreamsEventResponse`-Objekt verwenden, um die Sequenznummer des ersten fehlgeschlagenen Datensatzes im Batch zurückzugeben. Sie können auch Ihre eigene benutzerdefinierte Klasse mit der richtigen Antwortsyntax erstellen. Die folgende JSON-Struktur zeigt die erforderliche Antwortsyntax:

```
{ 
  "batchItemFailures": [ 
        {
            "itemIdentifier": "<SequenceNumber>"
        }
    ]
}
```

**Anmerkung**  
Wenn das `batchItemFailures`-Array mehrere Elemente enthält, verwendet Lambda den Datensatz mit der niedrigsten Sequenznummer als Kontrollpunkt. Lambda wiederholt dann alle Datensätze ab diesem Kontrollpunkt.

## Erfolgs- und Misserfolgsbedingungen
<a name="streams-batchfailurereporting-conditions"></a>

Lambda behandelt einen Batch als vollständigen Erfolg, wenn Sie eines der folgenden Elemente zurückgeben:
+ Eine leere `batchItemFailure`-Liste
+ Eine ungültige `batchItemFailure`-Liste
+ Ein leeres `EventResponse`
+ Ein ungültiges `EventResponse`

Lambda behandelt einen Batch als vollständigen Misserfolg, wenn Sie eines der folgenden Elemente zurückgeben:
+ Eine leere Zeichenfolge `itemIdentifier`
+ Ein ungültiges `itemIdentifier`
+ Ein `itemIdentifier` mit einem falschen Schlüsselnamen

Lambda wiederholt Fehler basierend auf Ihrer Wiederholungsstrategie.

## Einen Batch halbieren
<a name="streams-batchfailurereporting-bisect"></a>

Wenn Ihr Aufruf fehlschlägt und `BisectBatchOnFunctionError` eingeschaltet ist, wird der Stapel unabhängig von Ihrer `ReportBatchItemFailures`-Einstellung halbiert.

Wenn eine partielle Batch-Erfolgsantwort empfangen wird und sowohl `BisectBatchOnFunctionError` als auch `ReportBatchItemFailures` aktiviert sind, wird der Batch mit der zurückgegebenen Sequenznummer halbiert und Lambda versucht nur die verbleibenden Datensätze erneut.

Um die Implementierung der partiellen Batch-Antwortlogik zu vereinfachen, sollten Sie in Erwägung ziehen, das [Batch Processor Utility](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/) von Powertools for zu verwenden AWS Lambda, das diese Komplexität automatisch für Sie erledigt.

Hier sind einige Beispiele für Funktionscode, der die Liste der fehlgeschlagenen Nachrichten IDs im Batch zurückgibt:

------
#### [ .NET ]

**SDK für .NET**  
 Es gibt noch mehr dazu GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Melden von Fehlern bei Kinesis-Batchelementen mit Lambda unter Verwendung von .NET.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
﻿using System.Text;
using System.Text.Json.Serialization;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace KinesisIntegration;

public class Function
{
    // Powertools Logger requires an environment variables against your function
    // POWERTOOLS_SERVICE_NAME
    [Logging(LogEvent = true)]
    public async Task<StreamsEventResponse> FunctionHandler(KinesisEvent evnt, ILambdaContext context)
    {
        if (evnt.Records.Count == 0)
        {
            Logger.LogInformation("Empty Kinesis Event received");
            return new StreamsEventResponse();
        }

        foreach (var record in evnt.Records)
        {
            try
            {
                Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
                string data = await GetRecordDataAsync(record.Kinesis, context);
                Logger.LogInformation($"Data: {data}");
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex)
            {
                Logger.LogError($"An error occurred {ex.Message}");
                /* Since we are working with streams, we can return the failed item immediately.
                   Lambda will immediately begin to retry processing from this failed item onwards. */
                return new StreamsEventResponse
                {
                    BatchItemFailures = new List<StreamsEventResponse.BatchItemFailure>
                    {
                        new StreamsEventResponse.BatchItemFailure { ItemIdentifier = record.Kinesis.SequenceNumber }
                    }
                };
            }
        }
        Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
        return new StreamsEventResponse();
    }

    private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
    {
        byte[] bytes = record.Data.ToArray();
        string data = Encoding.UTF8.GetString(bytes);
        await Task.CompletedTask; //Placeholder for actual async work
        return data;
    }
}

public class StreamsEventResponse
{
    [JsonPropertyName("batchItemFailures")]
    public IList<BatchItemFailure> BatchItemFailures { get; set; }
    public class BatchItemFailure
    {
        [JsonPropertyName("itemIdentifier")]
        public string ItemIdentifier { get; set; }
    }
}
```

------
#### [ Go ]

**SDK für Go V2**  
 Es gibt noch mehr GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Melden von Fehlern bei Kinesis-Batchelementen mit Lambda unter Verwendung von Go.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"fmt"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(ctx context.Context, kinesisEvent events.KinesisEvent) (map[string]interface{}, error) {
	batchItemFailures := []map[string]interface{}{}

	for _, record := range kinesisEvent.Records {
		curRecordSequenceNumber := ""

		// Process your record
		if /* Your record processing condition here */ {
			curRecordSequenceNumber = record.Kinesis.SequenceNumber
		}

		// Add a condition to check if the record processing failed
		if curRecordSequenceNumber != "" {
			batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": curRecordSequenceNumber})
		}
	}

	kinesisBatchResponse := map[string]interface{}{
		"batchItemFailures": batchItemFailures,
	}
	return kinesisBatchResponse, nil
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK für Java 2.x**  
 Es gibt noch mehr GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Melden von Fehlern bei Kinesis-Batchelementen mit Lambda unter Verwendung von Java.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, StreamsEventResponse> {

    @Override
    public StreamsEventResponse handleRequest(KinesisEvent input, Context context) {

        List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
        String curRecordSequenceNumber = "";

        for (KinesisEvent.KinesisEventRecord kinesisEventRecord : input.getRecords()) {
            try {
                //Process your record
                KinesisEvent.Record kinesisRecord = kinesisEventRecord.getKinesis();
                curRecordSequenceNumber = kinesisRecord.getSequenceNumber();

            } catch (Exception e) {
                /* Since we are working with streams, we can return the failed item immediately.
                   Lambda will immediately begin to retry processing from this failed item onwards. */
                batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
                return new StreamsEventResponse(batchItemFailures);
            }
        }
       
       return new StreamsEventResponse(batchItemFailures);   
    }
}
```

------
#### [ JavaScript ]

**SDK für JavaScript (v3)**  
 Es gibt noch mehr dazu GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/blob/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Melden von Fehlern bei Kinesis-Batchelementen mit Lambda unter Verwendung von Javascript.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
  for (const record of event.Records) {
    try {
      console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      console.log(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      console.error(`An error occurred ${err}`);
      /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
      return {
        batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }],
      };
    }
  }
  console.log(`Successfully processed ${event.Records.length} records.`);
  return { batchItemFailures: [] };
};

async function getRecordDataAsync(payload) {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```
Melden von Fehlern Kinesis Kinesis-Batch-Elementen mit Lambda unter Verwendung von. TypeScript  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
  KinesisStreamEvent,
  Context,
  KinesisStreamHandler,
  KinesisStreamRecordPayload,
  KinesisStreamBatchResponse,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "kinesis-stream-handler-sample",
});

export const functionHandler: KinesisStreamHandler = async (
  event: KinesisStreamEvent,
  context: Context
): Promise<KinesisStreamBatchResponse> => {
  for (const record of event.Records) {
    try {
      logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      logger.info(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      logger.error(`An error occurred ${err}`);
      /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
      return {
        batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }],
      };
    }
  }
  logger.info(`Successfully processed ${event.Records.length} records.`);
  return { batchItemFailures: [] };
};

async function getRecordDataAsync(
  payload: KinesisStreamRecordPayload
): Promise<string> {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```

------
#### [ PHP ]

**SDK für PHP**  
 Es gibt noch mehr dazu. GitHub Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Melden von Fehlern bei Kinesis-Batchelementen mit Lambda unter Verwendung von PHP.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler implements StdHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handle(mixed $event, Context $context): array
    {
        $kinesisEvent = new KinesisEvent($event);
        $this->logger->info("Processing records");
        $records = $kinesisEvent->getRecords();

        $failedRecords = [];
        foreach ($records as $record) {
            try {
                $data = $record->getData();
                $this->logger->info(json_encode($data));
                // TODO: Do interesting work based on the new data
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
                // failed processing the record
                $failedRecords[] = $record->getSequenceNumber();
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");

        // change format for the response
        $failures = array_map(
            fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber],
            $failedRecords
        );

        return [
            'batchItemFailures' => $failures
        ];
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK für Python (Boto3)**  
 Es gibt noch mehr GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Melden von Fehlern bei Kinesis-Batchelementen mit Lambda unter Verwendung von Python.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def handler(event, context):
    records = event.get("Records")
    curRecordSequenceNumber = ""
    
    for record in records:
        try:
            # Process your record
            curRecordSequenceNumber = record["kinesis"]["sequenceNumber"]
        except Exception as e:
            # Return failed record's sequence number
            return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]}

    return {"batchItemFailures":[]}
```

------
#### [ Ruby ]

**SDK für Ruby**  
 Es gibt noch mehr GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Melden von Fehlern bei Kinesis-Batchelementen mit Lambda unter Verwendung von Ruby.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'

def lambda_handler(event:, context:)
  batch_item_failures = []

  event['Records'].each do |record|
    begin
      puts "Processed Kinesis Event - EventID: #{record['eventID']}"
      record_data = get_record_data_async(record['kinesis'])
      puts "Record Data: #{record_data}"
      # TODO: Do interesting work based on the new data
    rescue StandardError => err
      puts "An error occurred #{err}"
      # Since we are working with streams, we can return the failed item immediately.
      # Lambda will immediately begin to retry processing from this failed item onwards.
      return { batchItemFailures: [{ itemIdentifier: record['kinesis']['sequenceNumber'] }] }
    end
  end

  puts "Successfully processed #{event['Records'].length} records."
  { batchItemFailures: batch_item_failures }
end

def get_record_data_async(payload)
  data = Base64.decode64(payload['data']).force_encoding('utf-8')
  # Placeholder for actual async work
  sleep(1)
  data
end
```

------
#### [ Rust ]

**SDK für Rust**  
 Es gibt noch mehr dazu GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Melden von Fehlern bei Kinesis-Batchelementen mit Lambda unter Verwendung von Rust.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::{
    event::kinesis::KinesisEvent,
    kinesis::KinesisEventRecord,
    streams::{KinesisBatchItemFailure, KinesisEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<KinesisEventResponse, Error> {
    let mut response = KinesisEventResponse {
        batch_item_failures: vec![],
    };

    if event.payload.records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(response);
    }

    for record in &event.payload.records {
        tracing::info!(
            "EventId: {}",
            record.event_id.as_deref().unwrap_or_default()
        );

        let record_processing_result = process_record(record);

        if record_processing_result.is_err() {
            response.batch_item_failures.push(KinesisBatchItemFailure {
                item_identifier: record.kinesis.sequence_number.clone(),
            });
            /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
            return Ok(response);
        }
    }

    tracing::info!(
        "Successfully processed {} records",
        event.payload.records.len()
    );

    Ok(response)
}

fn process_record(record: &KinesisEventRecord) -> Result<(), Error> {
    let record_data = std::str::from_utf8(record.kinesis.data.as_slice());

    if let Some(err) = record_data.err() {
        tracing::error!("Error: {}", err);
        return Err(Error::from(err));
    }

    let record_data = record_data.unwrap_or_default();

    // do something interesting with the data
    tracing::info!("Data: {}", record_data);

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

## Powertools für die AWS Lambda Batch-Verarbeitung verwenden
<a name="services-kinesis-batchfailurereporting-powertools"></a>

Das Batchverarbeitungsprogramm von Powertools for verarbeitet AWS Lambda automatisch die Logik für partielle Batch-Antworten und reduziert so die Komplexität der Implementierung von Batch-Fehlerberichten. Hier sind Beispiele für die Verwendung des Batch-Prozessors:

**Python**  
Umfassende Beispiele und Anweisungen zur Einrichtung finden Sie in der [Dokumentation zum Batch-Prozessor](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/).
Verarbeitung von Kinesis Data Streams Streams-Stream-Datensätzen mit einem AWS Lambda Batch-Prozessor.  

```
import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes import KinesisEvent
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
logger = Logger()

def record_handler(record):
    logger.info(record)
    # Your business logic here
    # Raise an exception to mark this record as failed
    
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(
        event=event, 
        record_handler=record_handler, 
        processor=processor,
        context=context
    )
```

**TypeScript**  
Umfassende Beispiele und Anweisungen zur Einrichtung finden Sie in der [Dokumentation zum Batch-Prozessor](https://docs.aws.amazon.com/powertools/typescript/latest/features/batch/).
Verarbeitung von Kinesis Data Streams Streams-Stream-Datensätzen mit einem AWS Lambda Batch-Prozessor.  

```
import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { KinesisEvent, Context } from 'aws-lambda';

const processor = new BatchProcessor(EventType.KinesisDataStreams);
const logger = new Logger();

const recordHandler = async (record: any): Promise<void> => {
    logger.info('Processing record', { record });
    // Your business logic here
    // Throw an error to mark this record as failed
};

export const handler = async (event: KinesisEvent, context: Context) => {
    return processPartialResponse(event, recordHandler, processor, {
        context,
    });
};
```

**Java**  
Umfassende Beispiele und Anweisungen zur Einrichtung finden Sie in der [Dokumentation zum Batch-Prozessor](https://docs.powertools.aws.dev/lambda/java/latest/utilities/batch/).
Verarbeitung von Kinesis Data Streams Streams-Stream-Datensätzen mit einem AWS Lambda Batch-Prozessor.  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;

public class KinesisStreamBatchHandler implements RequestHandler<KinesisEvent, StreamsEventResponse> {

    private final BatchMessageHandler<KinesisEvent, StreamsEventResponse> handler;

    public KinesisStreamBatchHandler() {
        handler = new BatchMessageHandlerBuilder()
                .withKinesisBatchHandler()
                .buildWithRawMessageHandler(this::processMessage);
    }

    @Override
    public StreamsEventResponse handleRequest(KinesisEvent kinesisEvent, Context context) {
        return handler.processBatch(kinesisEvent, context);
    }

    private void processMessage(KinesisEvent.KinesisEventRecord kinesisEventRecord, Context context) {
        // Process the stream record
    }
}
```

**.NET**  
Umfassende Beispiele und Anweisungen zur Einrichtung finden Sie in der [Dokumentation zum Batch-Prozessor](https://docs.aws.amazon.com/powertools/dotnet/utilities/batch-processing/).
Verarbeitung von Kinesis Data Streams Streams-Stream-Datensätzen mit einem AWS Lambda Batch-Prozessor.  

```
using System;
using System.Threading;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using Amazon.Lambda.Serialization.SystemTextJson;
using AWS.Lambda.Powertools.BatchProcessing;

[assembly: LambdaSerializer(typeof(DefaultLambdaJsonSerializer))]

namespace HelloWorld;

public class OrderEvent
{
    public string? OrderId { get; set; }
    public string? CustomerId { get; set; }
    public decimal Amount { get; set; }
    public DateTime OrderDate { get; set; }
}

internal class TypedKinesisRecordHandler : ITypedRecordHandler<OrderEvent> 
{
    public async Task<RecordHandlerResult> HandleAsync(OrderEvent orderEvent, CancellationToken cancellationToken)
    {
        if (string.IsNullOrEmpty(orderEvent.OrderId)) 
        {
            throw new ArgumentException("Order ID is required");
        }

        return await Task.FromResult(RecordHandlerResult.None); 
    }
}

public class Function
{
    [BatchProcessor(TypedRecordHandler = typeof(TypedKinesisRecordHandler))]
    public BatchItemFailuresResponse HandlerUsingTypedAttribute(KinesisEvent _)
    {
        return TypedKinesisStreamBatchProcessor.Result.BatchItemFailuresResponse; 
    }
}
```

# Aufbewahrung verworfener Batch-Datensätze für eine Kinesis Data Streams-Ereignisquelle in Lambda
<a name="kinesis-on-failure-destination"></a>

Die Fehlerbehandlung für Kinesis-Zuordnungen von Ereignisquellen hängt davon ab, ob der Fehler vor dem Aufruf der Funktion oder während des Funktionsaufrufs auftritt:
+ **Vor dem Aufruf:** Wenn eine Lambda-Ereignisquellenzuordnung die Funktion aufgrund von Drosselung oder anderen Problemen nicht aufrufen kann, versucht sie es erneut, bis die Datensätze ablaufen oder das in der Ereignisquellenzuordnung () konfigurierte Höchstalter überschreiten. [MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)
+ **Während des Aufrufs:** Wenn die Funktion aufgerufen wird, aber einen Fehler zurückgibt, versucht Lambda es erneut, bis die Datensätze ablaufen, das Höchstalter ([MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)) überschreiten oder die konfigurierte Wiederholungsquote () erreicht haben. [MaximumRetryAttempts](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRetryAttempts) Bei Funktionsfehlern können Sie auch konfigurieren [BisectBatchOnFunctionError](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-response-BisectBatchOnFunctionError), dass ein fehlgeschlagener Batch in zwei kleinere Batches aufgeteilt wird, wodurch fehlerhafte Datensätze isoliert und Timeouts vermieden werden. Durch das Aufteilen von Batches wird die Quote für Wiederholungen nicht verbraucht.

Wenn die Fehlerbehandlungsmaßnahmen fehlschlagen, verwirft Lambda die Datensätze und setzt die Verarbeitung von Batches aus dem Stream fort. Bei den Standardeinstellungen bedeutet dies, dass ein fehlerhafter Datensatz die Verarbeitung auf dem betroffenen Shard für bis zu eine Woche blockieren kann. Um dies zu vermeiden, konfigurieren Sie die Ereignisquellenzuordnung Ihrer Funktion mit einer angemessenen Anzahl von Wiederholungen und einem maximalen Datensatzalter, das zu Ihrem Anwendungsfall passt.

## Konfigurieren von Zielen für fehlgeschlagene Aufrufe
<a name="kinesis-on-failure-destination-console"></a>

Um Datensätze zu fehlgeschlagenen Aufrufen zur Zuordnung von Ereignisquellen beizubehalten, fügen Sie der Zuordnung von Ereignisquellen Ihrer Funktion ein Ziel hinzu. Jeder an das Ziel gesendete Datensatz ist ein JSON-Dokument mit Metadaten über den fehlgeschlagenen Aufruf. Bei Amazon S3-Zielen sendet Lambda auch den gesamten Aufrufdatensatz zusammen mit den Metadaten. Sie können jedes Amazon SNS SNS-Thema, jede Amazon SQS SQS-Warteschlange, jeden Amazon S3 S3-Bucket oder Kafka als Ziel konfigurieren.

Bei Amazon-S3-Zielen können Sie das Feature [Amazon S3 Event Notifications](https://docs.aws.amazon.com/) verwenden, um Benachrichtigungen zu erhalten, wenn Objekte in Ihren S3-Ziel-Bucket hochgeladen werden. Sie können S3-Ereignisbenachrichtigungen auch so konfigurieren, dass sie eine andere Lambda-Funktion aufrufen, um eine automatische Verarbeitung für fehlgeschlagene Stapel durchzuführen.

Ihre Ausführungsrolle muss über Berechtigungen für das Ziel verfügen:
+ **[Für ein SQS-Ziel: sqs: SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)**
+ **[Für ein SNS-Ziel: sns:Publish](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html)**
+ **Für ein S3-Ziel: s3:** [und [s3: PutObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html) ListBucket](https://docs.aws.amazon.com/AmazonS3/latest/API/ListObjectsV2.html)
+ **Für ein Kafka-Ziel: [kafka-cluster](https://docs.aws.amazon.com/msk/latest/developerguide/kafka-actions.html):** WriteData

Sie können ein Kafka-Thema als Ziel für den Fall eines Fehlers für Ihre Quellenzuordnungen für Kafka-Ereignisse konfigurieren. Wenn Lambda Datensätze nach anstrengenden Wiederholungsversuchen nicht verarbeiten kann oder wenn Datensätze das Höchstalter überschreiten, sendet Lambda die fehlgeschlagenen Datensätze zur späteren Verarbeitung an das angegebene Kafka-Thema. Weitere Informationen finden Sie unter [Ein Kafka-Thema als Ziel für den Fall eines Fehlers verwenden](kafka-on-failure-destination.md).

[Wenn Sie die Verschlüsselung mit Ihrem eigenen KMS-Schlüssel für ein S3-Ziel aktiviert haben, muss die Ausführungsrolle Ihrer Funktion auch die Berechtigung haben, kms: aufzurufen. GenerateDataKey](https://docs.aws.amazon.com/kms/latest/APIReference/API_GenerateDataKey.html) Wenn sich der KMS-Schlüssel und das S3-Bucket-Ziel in einem anderen Konto als Ihre Lambda-Funktion und Ausführungsrolle befinden, konfigurieren Sie den KMS-Schlüssel so, dass er der Ausführungsrolle vertraut, die zugelassen kms: GenerateDataKey wird.

Gehen Sie folgendermaßen vor, um ein Ausfallziel mit der Konsole zu konfigurieren:

1. Öffnen Sie die Seite [Funktionen](https://console.aws.amazon.com/lambda/home#/functions) der Lambda-Konsole.

1. Wählen Sie eine Funktion aus.

1. Wählen Sie unter **Function overview (Funktionsübersicht)** die Option **Add destination (Ziel hinzufügen)**.

1. Wählen Sie als **Quelle** die Option **Aufruf der Zuordnung von Ereignisquellen** aus.

1. Wählen Sie für die **Zuordnung von Ereignisquellen** eine Ereignisquelle aus, die für diese Funktion konfiguriert ist.

1. Wählen Sie für **Bedingung** die Option **Bei Ausfall** aus. Für Aufrufe zur Zuordnung von Ereignisquellen ist dies die einzig akzeptierte Bedingung.

1. Wählen Sie unter **Zieltyp** den Zieltyp aus, an den Lambda Aufrufdatensätze sendet.

1. Wählen Sie unter **Destination (Ziel)** eine Ressource aus.

1. Wählen Sie **Speichern**.

Sie können mit AWS Command Line Interface (AWS CLI) auch ein Ziel für den Fall eines Fehlers konfigurieren. Mit dem folgenden [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html)Befehl wird beispielsweise eine Zuordnung der Ereignisquelle mit einem SQS-Ziel für den Fall eines Fehlers hinzugefügt: `MyFunction`

```
aws lambda create-event-source-mapping \
--function-name "MyFunction" \
--event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
```

Der folgende [update-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/update-event-source-mapping.html)Befehl aktualisiert eine Ereignisquellenzuordnung, sodass fehlgeschlagene Aufrufdatensätze nach zwei Wiederholungsversuchen oder wenn die Datensätze älter als eine Stunde sind, an ein SNS-Ziel gesendet werden.

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--maximum-retry-attempts 2 \
--maximum-record-age-in-seconds 3600 \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'
```

Aktualisierte Einstellungen werden asynchron angewendet und werden erst nach Abschluss des Vorgangs in der Ausgabe berücksichtigt. Verwenden Sie den [get-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/get-event-source-mapping.html)-Befehl, um den aktuellen Status anzuzeigen.

Um ein Ziel zu entfernen, geben Sie eine leere Zeichenfolge als Argument für den `destination-config`-Parameter an:

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--destination-config '{"OnFailure": {"Destination": ""}}'
```

### Bewährte Methoden für die Sicherheit in Amazon S3-Zielen
<a name="kinesis-s3-destination-security"></a>

Das Löschen eines S3-Buckets, der als Ziel konfiguriert ist, ohne das Ziel aus der Konfiguration Ihrer Funktion zu entfernen, kann ein Sicherheitsrisiko darstellen. Wenn ein anderer Benutzer den Namen Ihres Ziel-Buckets kennt, kann er den Bucket in seinem AWS-Konto neu erstellen. Aufzeichnungen über fehlgeschlagene Aufrufe werden an den entsprechenden Bucket gesendet, wodurch möglicherweise Daten aus Ihrer Funktion verfügbar gemacht werden.

**Warnung**  
Um sicherzustellen, dass Aufrufdatensätze Ihrer Funktion nicht an einen S3-Bucket in einem anderen gesendet werden können AWS-Konto, fügen Sie der Ausführungsrolle Ihrer Funktion eine Bedingung hinzu, die die `s3:PutObject` Berechtigungen auf Buckets in Ihrem Konto beschränkt. 

-Das folgende Beispiel zeigt eine IAM-Richtlinie, die die `s3:PutObject`-Berechtigungen Ihrer Funktion auf Buckets in Ihrem Konto beschränkt. Diese Richtlinie gibt Lambda auch die `s3:ListBucket`-Erlaubnis, einen S3-Bucket als Ziel zu verwenden.

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3BucketResourceAccountWrite",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::*/*",
                "arn:aws:s3:::*"
            ],
            "Condition": {
                "StringEquals": {
                    "s3:ResourceAccount": "111122223333"
                }
            }
        }
    ]
}
```

Um der Ausführungsrolle Ihrer Funktion mithilfe von AWS-Managementkonsole oder eine Berechtigungsrichtlinie hinzuzufügen AWS CLI, lesen Sie die Anweisungen in den folgenden Verfahren:

------
#### [ Console ]

**So fügen Sie der Ausführungsrolle einer Funktion (Konsole) eine Berechtigungsrichtlinie hinzu**

1. Öffnen Sie die Seite [Funktionen](https://console.aws.amazon.com/lambda/home#/functions) der Lambda-Konsole.

1. Wählen Sie die Lambda-Funktion aus, deren Ausführungsrolle Sie ändern möchten.

1. Klicken Sie in der Registerkarte **Konfiguration** auf die Option **Berechtigungen**.

1. Wählen Sie auf der Registerkarte **Ausführungsrolle** den **Rollennamen** Ihrer Funktion aus, um die IAM-Konsolenseite der Rolle zu öffnen.

1. Fügen Sie der Rolle wie folgt eine Richtlinie mit Berechtigungen hinzu:

   1. Wählen Sie im Bereich **Berechtigungsrichtlinien** die Optionen **Berechtigungen hinzufügen** und dann **Inline-Richtlinie erstellen** aus.

   1. Wählen Sie im **Richtlinien-Editor** **JSON** aus.

   1. Fügen Sie die Richtlinie, die Sie hinzufügen möchten, in den Editor ein (indem Sie die vorhandene JSON-Datei ersetzt) und wählen Sie dann **Weiter** aus.

   1. Geben Sie unter **Richtliniendetails** für den **Richtliniennamen** ein.

   1. Wählen Sie **Richtlinie erstellen** aus.

------
#### [ AWS CLI ]

**So fügen Sie der Ausführungsrolle einer Funktion (CLI) eine Berechtigungsrichtlinie hinzu**

1. Erstellen Sie ein JSON-Richtliniendokument mit den erforderlichen Berechtigungen und speichern Sie es in einem lokalen Verzeichnis.

1. Verwenden Sie den `put-role-policy` IAM-CLI-Befehl, um die Berechtigungen zur Ausführungsrolle Ihrer Funktion hinzuzufügen. Führen Sie den folgenden Befehl in dem Verzeichnis aus, in dem Sie Ihr JSON-Richtliniendokument gespeichert haben und ersetzen Sie den Rollennamen, den Richtliniennamen und das Richtliniendokument durch Ihre eigenen Werte.

   ```
   aws iam put-role-policy \
   --role-name my_lambda_role \
   --policy-name LambdaS3DestinationPolicy \
   --policy-document file://my_policy.json
   ```

------

### Beispiel für einen Amazon SNS- und Amazon SQS-Aufrufsatz
<a name="kinesis-on-failure-destination-example-sns-sqs"></a>

Das folgende Beispiel zeigt, was Lambda bei einem fehlgeschlagenen Aufruf der Kinesis-Ereignisquelle an eine SQS-Warteschlange oder ein SNS-Thema sendet. Da Lambda nur die Metadaten für diese Zieltypen sendet, verwenden Sie die Felder `streamArn`, `shardId`, `startSequenceNumber` und `endSequenceNumber`, um den vollständigen Originaldatensatz abzurufen. Alle in der `KinesisBatchInfo`-Eigenschaft angezeigten Felder sind immer vorhanden.

```
{
    "requestContext": {
        "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:38:06.021Z",
    "KinesisBatchInfo": {
        "shardId": "shardId-000000000001",
        "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722",
        "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z",
        "batchSize": 500,
        "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream"
    }
}
```

Sie können diese Informationen verwenden, um die betroffenen Datensätze aus dem Stream für die Fehlersuche abzurufen. Die tatsächlichen Datensätze sind nicht enthalten, daher müssen Sie diesen Datensatz verarbeiten und aus dem Stream abrufen, bevor sie ablaufen und verloren gehen.

### Beispiel eines Amazon S3-Aufrufdatensatzes
<a name="kinesis-on-failure-destination-example-sns-sqs-s3"></a>

Das folgende Beispiel zeigt, was Lambda bei einem fehlgeschlagenen Aufruf der Kinesis-Ereignisquelle an einen Amazon S3-Bucket sendet. Zusätzlich zu allen Feldern aus dem vorherigen Beispiel für SQS- und SNS-Ziele enthält das Feld `payload` den ursprünglichen Aufrufdatensatz als maskierte JSON-Zeichenfolge.

```
{
    "requestContext": {
        "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:38:06.021Z",
    "KinesisBatchInfo": {
        "shardId": "shardId-000000000001",
        "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722",
        "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z",
        "batchSize": 500,
        "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream"
    },
    "payload": "<Whole Event>" // Only available in S3
}
```

Das S3-Objekt, das den Aufrufdatensatz enthält, verwendet die folgende Namenskonvention:

```
aws/lambda/<ESM-UUID>/<shardID>/YYYY/MM/DD/YYYY-MM-DDTHH.MM.SS-<Random UUID>
```

# Implementierung der zustandsabhängigen Verarbeitung von Kinesis-Datenströmen in Lambda
<a name="services-kinesis-windows"></a>

Lambda-Funktionen können kontinuierliche Stream-Verarbeitungsanwendungen ausführen. Ein Stream entspricht einer unbegrenzten Menge von Daten, die kontinuierlich durch Ihre Anwendung fließen. Um Informationen aus dieser sich ständig aktualisierenden Eingabe zu analysieren, können Sie die enthaltenen Datensätze mithilfe eines zeitlich definierten Fensters binden.

Rollierende Fenster sind unterschiedliche Zeitfenster, die sich in regelmäßigen Abständen öffnen und schließen. Standardmäßig sind Lambda-Aufrufe zustandslos – Sie können sie nicht für die Verarbeitung von Daten über mehrere kontinuierliche Aufrufe hinweg ohne eine externe Datenbank verwenden. Mit rollierenden Fenstern können Sie jedoch Ihren Status über Aufrufe hinweg beibehalten. Dieser Zustand enthält das Gesamtergebnis der Nachrichten, die zuvor für das aktuelle Fenster verarbeitet wurden. Ihr Zustand kann maximal 1 MB pro Shard betragen. Wenn er diese Größe überschreitet, wird Lambda das Fenster vorzeitig beenden.

Jeder Datensatz in einem Stream gehört zu einem bestimmten Fenster. Lambda verarbeitet jeden Datensatz mindestens einmal, garantiert jedoch nicht, dass jeder Datensatz nur einmal verarbeitet wird. In seltenen Fällen, etwa bei der Fehlerbehandlung, werden einige Datensätze möglicherweise mehrmals verarbeitet. Datensätze werden beim ersten Mal immer in der richtigen Reihenfolge verarbeitet. Wenn Datensätze mehr als einmal verarbeitet werden, werden sie nicht in der richtigen Reihenfolge verarbeitet.

## Aggregation und Verarbeitung
<a name="streams-tumbling-processing"></a>

Ihre benutzerverwaltete Funktion wird sowohl zur Aggregation als auch zur Verarbeitung der Endergebnisse dieser Aggregation aufgerufen. Lambda aggregiert alle im Fenster empfangenen Datensätze. Sie können diese Datensätze in mehreren Stapeln erhalten, jeweils als ein separater Aufruf. Jeder Aufruf erhält einen Zustand. Wenn Sie also rollierende Fenster verwenden, muss Ihre Lambda-Funktionsantwort eine `state`-Eigenschaft enthalten. Wenn die Antwort keine `state`-Eigenschaft enthält, betrachtet Lambda dies als fehlgeschlagenen Aufruf. Um diese Bedingung zu erfüllen, kann Ihre Funktion ein `TimeWindowEventResponse`-Objekt zurückgeben, das die folgende JSON-Form aufweist:

**Example `TimeWindowEventResponse`-Werte**  

```
{
    "state": {
        "1": 282,
        "2": 715
    },
    "batchItemFailures": []
}
```

**Anmerkung**  
Für Java-Funktionen empfehlen wir, eine `Map<String, String>` zu verwenden, um den Status darzustellen.

Am Ende des Fensters wird das Flag `isFinalInvokeForWindow` auf `true` gesetzt, um anzugeben, dass es sich um den Endzustand handelt und dass es für die Verarbeitung bereit ist. Nach der Verarbeitung werden das Fenster und Ihr endgültiger Aufruf wird abgeschlossen, und dann wird der Zustand gelöscht.

Am Ende Ihres Fensters verwendet Lambda die endgültige Verarbeitung für Aktionen an den Aggregationsergebnissen. Ihre endgültige Verarbeitung wird synchron aufgerufen. Nach erfolgreichem Aufruf zeigt Ihre Funktion auf die Sequenznummer und die Stream-Verarbeitung wird fortgesetzt. Wenn der Aufruf nicht erfolgreich ist, unterbricht Ihre Lambda-Funktion die weitere Verarbeitung bis zu einem erfolgreichen Aufruf.

**Example KinesisTimeWindowEvent**  

```
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1607497475.000
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role",
            "awsRegion": "us-east-1",
            "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream"
        }
    ],
    "window": {
        "start": "2020-12-09T07:04:00Z",
        "end": "2020-12-09T07:06:00Z"
    },
    "state": {
        "1": 282,
        "2": 715
    },
    "shardId": "shardId-000000000006",
    "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream",
    "isFinalInvokeForWindow": false,
    "isWindowTerminatedEarly": false
}
```

## Konfiguration
<a name="streams-tumbling-config"></a>

Sie können rollierende Fenster konfigurieren, wenn Sie eine Ereignisquellenzuordnung erstellen oder aktualisieren. Um ein Taumelfenster zu konfigurieren, geben Sie das Fenster in Sekunden an ([TumblingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-TumblingWindowInSeconds)). Der folgende Beispielbefehl AWS Command Line Interface (AWS CLI) erstellt eine Quellenzuordnung für Streaming-Ereignisse mit einem Wechselfenster von 120 Sekunden. Die für Aggregation und Verarbeitung definierte Lambda-Funktion wird `tumbling-window-example-function` genannt.

```
aws lambda create-event-source-mapping \
--event-source-arn arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream \
--function-name tumbling-window-example-function \
--starting-position TRIM_HORIZON \
--tumbling-window-in-seconds 120
```

Lambda bestimmt die rollierenden Fenstergrenzen basierend auf dem Zeitpunkt, zu dem Datensätze in den Stream eingefügt wurden. Für alle Datensätze steht ein ungefährer Zeitstempel zur Verfügung, den Lambda in Grenzbestimmungen verwendet.

Rollierende Fensteraggregationen unterstützen kein Resharding. Wenn ein Shard endet, betrachtet Lambda das aktuelle Fenster als geschlossen, und alle untergeordneten Shards beginnen ihr eigenes Fenster in einem neuen Zustand. Wenn dem aktuellen Fenster keine neuen Datensätze hinzugefügt werden, wartet Lambda bis zu 2 Minuten, bevor es annimmt, dass das Fenster zu Ende ist. Dadurch wird sichergestellt, dass die Funktion alle Datensätze im aktuellen Fenster liest, auch wenn die Datensätze in Abständen hinzugefügt werden.

Rollierende Fenster unterstützen vollständig die bestehenden Wiederholungsrichtlinien `maxRetryAttempts` und `maxRecordAge`.

**Example Handler.py – Aggregation und Verarbeitung**  
Die folgende Python-Funktion veranschaulicht, wie Sie Ihren Endzustand aggregieren und dann verarbeiten:  

```
def lambda_handler(event, context):
    print('Incoming event: ', event)
    print('Incoming state: ', event['state'])

#Check if this is the end of the window to either aggregate or process.
    if event['isFinalInvokeForWindow']:
        # logic to handle final state of the window
        print('Destination invoke')
    else:
        print('Aggregate invoke')

#Check for early terminations
    if event['isWindowTerminatedEarly']:
        print('Window terminated early')

    #Aggregation logic
    state = event['state']
    for record in event['Records']:
        state[record['kinesis']['partitionKey']] = state.get(record['kinesis']['partitionKey'], 0) + 1

    print('Returning state: ', state)
    return {'state': state}
```

# Lambda-Parameter für Amazon Kinesis Data Streams Zuordnungen von Ereignisquellen
<a name="services-kinesis-parameters"></a>

Alle Lambda-Zuordnungen von Ereignisquellen verwenden dieselben API-Operationen [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) und [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html). Allerdings gelten nur einige der Parameter für Kinesis.


| Parameter | Erforderlich | Standard | Hinweise | 
| --- | --- | --- | --- | 
|  [BatchSize](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-BatchSize)  |  N  |  100  |  Höchstwert: 10 000.  | 
|  [BisectBatchOnFunctionError](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-BisectBatchOnFunctionError)  |  N  |  false  |  Keine | 
|  [DestinationConfig](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-DestinationConfig)  |  N  | N/A |  Ein Ziel der Amazon-SQS-Warteschlange oder des Amazon-SNS-Themas für verworfene Datensätze. Weitere Informationen finden Sie unter [Konfigurieren von Zielen für fehlgeschlagene Aufrufe](kinesis-on-failure-destination.md#kinesis-on-failure-destination-console).  | 
|  [Aktiviert](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-Enabled)  |  N  |  true  |  Keine | 
|  [EventSourceArn](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-EventSourceArn)  |  Y  | N/A |  Der ARN des Datenstroms oder eines Stream-Konsumenten  | 
|  [FunctionName](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionName)  |  Y  | N/A |  Keine | 
|  [FunctionResponseTypes](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionResponseTypes)  |  N  |  N/A |  Damit Ihre Funktion bestimmte Fehler in einem Batch meldet, beziehen Sie den Wert `ReportBatchItemFailures` in `FunctionResponseTypes` ein. Weitere Informationen finden Sie unter [Konfigurieren einer teilweisen Batch-Antwort mit Kinesis Data Streams und Lambda](services-kinesis-batchfailurereporting.md).  | 
|  [MaximumBatchingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumBatchingWindowInSeconds)  |  N  |  0  |  Keine | 
|  [MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)  |  N  |  -1  |  -1 bedeutet unendlich: Lambda verwirft keine Datensätze ([Kinesis Data Streams-Datenaufbewahrungseinstellungen](https://docs.aws.amazon.com/streams/latest/dev/kinesis-extended-retention.html) gelten weiterhin) Minimum: -1 Höchstwert: 604 800  | 
|  [MaximumRetryAttempts](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRetryAttempts)  |  N  |  -1  |  -1 bedeutet unendlich: Fehlgeschlagene Datensätze werden wiederholt, bis der Datensatz abläuft Minimum: -1 Höchstwert: 10 000.  | 
|  [ParallelizationFactor](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-ParallelizationFactor)  |  N  |  1  |  Maximum: 10  | 
|  [StartingPosition](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-StartingPosition)  |  Y  |  N/A |  AT\$1TIMESTAMP, TRIM\$1HORIZON, oder LATEST  | 
|  [StartingPositionTimestamp](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-StartingPositionTimestamp)  |  N  |  N/A |  Nur gültig, wenn StartingPosition auf AT\$1TIMESTAMP gesetzt ist. Die Zeit, ab der mit dem Lesen begonnen werden soll, in Unix-Zeitsekunden  | 
|  [TumblingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-TumblingWindowInSeconds)  |  N  |  N/A |  Minimum: 0 Maximum: 900  | 

# Verwendung der Ereignisfilterung mit einer Kinesis-Ereignisquelle
<a name="with-kinesis-filtering"></a>

Sie können die Ereignisfilterung verwenden, um zu steuern, welche Datensätze aus einem Stream oder einer Warteschlange Lambda an Ihre Funktion sendet. Allgemeine Informationen über die Funktionsweise der Ereignisfilterung finden Sie unter [Steuern Sie, welche Ereignisse Lambda an Ihre Funktion sendet](invocation-eventfiltering.md).

Dieser Abschnitt konzentriert sich auf die Ereignisfilterung für Kinesis-Ereignisquellen.

**Anmerkung**  
Kinesis-Zuordnungen von Ereignisquellen unterstützen nur das Filtern nach dem `data`-Schlüssel.

**Topics**
+ [Grundlagen der Kinesis-Ereignisfilterung](#filtering-kinesis)
+ [Filtern von aggregierten Kinesis-Datensätzen](#filtering-kinesis-efo)

## Grundlagen der Kinesis-Ereignisfilterung
<a name="filtering-kinesis"></a>

Angenommen, ein Producer gibt JSON-formatierte Daten in Ihren Kinesis-Datenstrom ein. Ein Beispieldatensatz würde wie folgt aussehen, wobei die JSON-Daten im `data`-Feld in eine Base64-kodierte Zeichenfolge umgewandelt wurden.

```
{
    "kinesis": {
        "kinesisSchemaVersion": "1.0",
        "partitionKey": "1",
        "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
        "data": "eyJSZWNvcmROdW1iZXIiOiAiMDAwMSIsICJUaW1lU3RhbXAiOiAieXl5eS1tbS1kZFRoaDptbTpzcyIsICJSZXF1ZXN0Q29kZSI6ICJBQUFBIn0=",
        "approximateArrivalTimestamp": 1545084650.987
        },
    "eventSource": "aws:kinesis",
    "eventVersion": "1.0",
    "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
    "eventName": "aws:kinesis:record",
    "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
    "awsRegion": "us-east-2",
    "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
}
```

Solange die Daten, die der Producer in den Stream einspeist, als JSON gültig sind, können Sie die Ereignisfilterung verwenden, um Datensätze anhand des `data`-Schlüssels zu filtern. Nehmen wir an, ein Produzent gibt Datensätze im folgenden JSON-Format in Ihren Kinesis-Stream ein.

```
{
    "record": 12345,
    "order": {
        "type": "buy",
        "stock": "ANYCO",
        "quantity": 1000
        }
}
```

Um nur die Datensätze zu filtern, bei denen der Auftragstyp "Kaufen" ist, würde das `FilterCriteria`-Objekt wie folgt aussehen.

```
{
    "Filters": [
        {
            "Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"
        }
    ]
}
```

Zur Verdeutlichung sehen Sie hier den Wert des Filter-`Pattern` in reinem JSON. 

```
{
    "data": {
        "order": {
            "type": [ "buy" ]
            }
      }
}
```

Sie können Ihren Filter mithilfe der Konsole, AWS CLI oder einer AWS SAM-Vorlage hinzufügen.

------
#### [ Console ]

Um diesen Filter mithilfe der Konsole hinzuzufügen, folgen Sie den Anweisungen unter [Anhängen von Filterkriterien an eine Ereignisquellenzuordnung (Konsole)](invocation-eventfiltering.md#filtering-console) und geben Sie die folgende Zeichenfolge für die **Filterkriterien** ein.

```
{ "data" : { "order" : { "type" : [ "buy" ] } } }
```

------
#### [ AWS CLI ]

Um eine neue Zuordnung von Ereignisquellen mit diesen Filterkriterien über die AWS Command Line Interface (AWS CLI) zu erstellen, führen Sie den folgenden Befehl aus.

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/my-stream \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
```

Führen Sie den folgenden Befehl aus, um diese Filterkriterien zu einer vorhandenen Zuordnung von Ereignisquellen hinzuzufügen.

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
```

------
#### [ AWS SAM ]

Um diesen Filter mit der AWS SAM hinzuzufügen, fügen Sie das folgende Snippet zur YAML-Vorlage für Ihre Ereignisquelle hinzu.

```
FilterCriteria:
  Filters:
    - Pattern: '{ "data" : { "order" : { "type" : [ "buy" ] } } }'
```

------

Um Ereignisse aus Kinesis-Quellen ordnungsgemäß zu filtern, müssen sowohl das Datenfeld als auch die Filterkriterien für das Datenfeld ein gültiges JSON-Format aufweisen. Wenn eines der Felder kein gültiges JSON-Format hat, verwirft Lambda die Nachricht oder gibt eine Ausnahme aus. In der folgenden Tabelle ist das Verhalten zusammengefasst: 


| Format der eingehenden Daten | Filtermusterformat für Dateneigenschaften | Resultierende Aktion | 
| --- | --- | --- | 
|  Gültiges JSON  |  Gültiges JSON  |  Lambda filtert basierend auf Ihren Filterkriterien.  | 
|  Gültiges JSON  |  Kein Filtermuster für Dateneigenschaften  |  Lambda filtert (nur für die anderen Metadateneigenschaften) basierend auf Ihren Filterkriterien.  | 
|  Gültiges JSON  |  Kein JSON  |  Lambda gibt zum Zeitpunkt der Erstellung oder Aktualisierung der Ereignisquellenzuordnung eine Ausnahme aus. Das Filtermuster für Dateneigenschaften muss ein gültiges JSON-Format haben.  | 
|  Kein JSON  |  Gültiges JSON  |  Lambda verwirft den Datensatz.  | 
|  Kein JSON  |  Kein Filtermuster für Dateneigenschaften  |  Lambda filtert (nur für die anderen Metadateneigenschaften) basierend auf Ihren Filterkriterien.  | 
|  Kein JSON  |  Kein JSON  |  Lambda gibt zum Zeitpunkt der Erstellung oder Aktualisierung der Ereignisquellenzuordnung eine Ausnahme aus. Das Filtermuster für Dateneigenschaften muss ein gültiges JSON-Format haben.  | 

## Filtern von aggregierten Kinesis-Datensätzen
<a name="filtering-kinesis-efo"></a>

Mit Kinesis können Sie mehrere Datensätze in einem einzigen Kinesis-Datenstrom-Datensatz zusammenfassen, um Ihren Datendurchsatz zu erhöhen. Lambda kann Filterkriterien nur auf aggregierte Datensätze anwenden, wenn Sie Kinesis [Enhanced Fanout](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html) verwenden. Das Filtern aggregierter Datensätze mit Standard-Kinesis wird nicht unterstützt. Bei der Verwendung von Enhanced Fan-Out konfigurieren Sie einen dedizierten Kinesis-Durchsatzverbraucher, der als Auslöser für Ihre Lambda-Funktion dient. Lambda filtert dann die aggregierten Datensätze und übergibt nur die Datensätze, die Ihren Filterkriterien entsprechen.

Weitere Informationen zur Aggregation von Kinesis-Datensätzen finden Sie im Abschnitt [Aggregation](https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation) auf der Seite mit den wichtigsten Konzepten der Kinesis Producer Library (KPL). Weitere Informationen zur Verwendung von Lambda mit Kinesis Enhanced Fanout finden Sie im AWS-Compute-Blog unter [Erhöhung der Streamverarbeitungsleistung in Echtzeit mit Amazon Kinesis Data Streams Enhanced Fanout und AWS Lambda](https://aws.amazon.com/blogs/compute/increasing-real-time-stream-processing-performance-with-amazon-kinesis-data-streams-enhanced-fan-out-and-aws-lambda/).

# Anleitung: Verwenden von Lambda mit Kinesis Data Streams
<a name="with-kinesis-example"></a>

In dieser Anleitung erstellen Sie eine Lambda-Funktion, um Ereignisse aus einem Amazon Kinesis Data Streams zu nutzen. 

1. Die benutzerdefinierte Anwendung schreibt die Datensätze zum Stream.

1. AWS Lambda fragt den Stream ab und ruft, wenn es neue Datensätze im Stream erkennt, Ihre Lambda-Funktion auf.

1. AWS Lambda führt die Lambda-Funktion aus, indem es die Ausführungsrolle annimmt, die Sie bei der Erstellung der Lambda-Funktion angegeben haben.

## Voraussetzungen
<a name="with-kinesis-prepare"></a>

### Installieren Sie die AWS Command Line Interface
<a name="install_aws_cli"></a>

Wenn Sie das noch nicht installiert haben AWS Command Line Interface, folgen Sie den Schritten unter [Installieren oder Aktualisieren der neuesten Version von AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html), um es zu installieren.

Das Tutorial erfordert zum Ausführen von Befehlen ein Befehlszeilenterminal oder eine Shell. Verwenden Sie unter Linux und macOS Ihre bevorzugte Shell und Ihren bevorzugten Paketmanager.

**Anmerkung**  
In Windows werden einige Bash-CLI-Befehle, die Sie häufig mit Lambda verwenden (z. B. `zip`), von den integrierten Terminals des Betriebssystems nicht unterstützt. Um eine in Windows integrierte Version von Ubuntu und Bash zu erhalten, [installieren Sie das Windows-Subsystem für Linux](https://docs.microsoft.com/en-us/windows/wsl/install-win10). 

## Erstellen der Ausführungsrolle
<a name="with-kinesis-example-create-iam-role"></a>

Erstellen Sie die [Ausführungsrolle](lambda-intro-execution-role.md), die Ihrer Funktion die Berechtigung zum Zugriff auf AWS Ressourcen erteilt.

**So erstellen Sie eine Ausführungsrolle**

1. Öffnen Sie die Seite [Roles (Rollen)](https://console.aws.amazon.com/iam/home#/roles) in der IAM-Konsole.

1. Wählen Sie **Rolle erstellen** aus.

1. Erstellen Sie eine Rolle mit den folgenden Eigenschaften.
   + **Trusted entity (Vertrauenswürdige Entität** – **AWS Lambda**.
   + **Berechtigungen** — **AWSLambdaKinesisExecutionRole**.
   + **Role name (Name der Rolle** – **lambda-kinesis-role**.

Die **AWSLambdaKinesisExecutionRole**Richtlinie verfügt über die Berechtigungen, die die Funktion zum Lesen von Elementen aus Kinesis und zum CloudWatch Schreiben von Protokollen in Logs benötigt.

## Erstellen der Funktion
<a name="with-kinesis-example-create-function"></a>

Erstellen Sie eine Lambda-Funktion, die Ihre Kinesis-Nachrichten verarbeitet. Der Funktionscode protokolliert die Ereignis-ID und die Ereignisdaten des Kinesis-Datensatzes in CloudWatch Logs.

In diesem Tutorial wird die Runtime Node.js 24 verwendet, aber wir haben auch Beispielcode in anderen Laufzeitsprachen bereitgestellt. Sie können die Registerkarte im folgenden Feld auswählen, um Code für die gewünschte Laufzeit anzusehen. Der JavaScript Code, den Sie in diesem Schritt verwenden, befindet sich im ersten Beispiel, das auf der **JavaScript**Registerkarte angezeigt wird.

------
#### [ .NET ]

**SDK für .NET**  
 Es gibt noch mehr dazu GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Nutzen eines Kinesis-Ereignisses mit Lambda unter Verwendung von .NET.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
﻿using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace KinesisIntegrationSampleCode;

public class Function
{
    // Powertools Logger requires an environment variables against your function
    // POWERTOOLS_SERVICE_NAME
    [Logging(LogEvent = true)]
    public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context)
    {
        if (evnt.Records.Count == 0)
        {
            Logger.LogInformation("Empty Kinesis Event received");
            return;
        }

        foreach (var record in evnt.Records)
        {
            try
            {
                Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
                string data = await GetRecordDataAsync(record.Kinesis, context);
                Logger.LogInformation($"Data: {data}");
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex)
            {
                Logger.LogError($"An error occurred {ex.Message}");
                throw;
            }
        }
        Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
    }

    private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
    {
        byte[] bytes = record.Data.ToArray();
        string data = Encoding.UTF8.GetString(bytes);
        await Task.CompletedTask; //Placeholder for actual async work
        return data;
    }
}
```

------
#### [ Go ]

**SDK für Go V2**  
 Es gibt noch mehr GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Nutzen eines Kinesis-Ereignisses mit Lambda unter Verwendung von Go.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"log"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {
	if len(kinesisEvent.Records) == 0 {
		log.Printf("empty Kinesis event received")
		return nil
	}

	for _, record := range kinesisEvent.Records {
		log.Printf("processed Kinesis event with EventId: %v", record.EventID)
		recordDataBytes := record.Kinesis.Data
		recordDataText := string(recordDataBytes)
		log.Printf("record data: %v", recordDataText)
		// TODO: Do interesting work based on the new data
	}
	log.Printf("successfully processed %v records", len(kinesisEvent.Records))
	return nil
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK für Java 2.x**  
 Es gibt noch mehr GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Nutzen eines Kinesis-Ereignisses mit Lambda unter Verwendung von Java.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package example;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;

public class Handler implements RequestHandler<KinesisEvent, Void> {
    @Override
    public Void handleRequest(final KinesisEvent event, final Context context) {
        LambdaLogger logger = context.getLogger();
        if (event.getRecords().isEmpty()) {
            logger.log("Empty Kinesis Event received");
            return null;
        }
        for (KinesisEvent.KinesisEventRecord record : event.getRecords()) {
            try {
                logger.log("Processed Event with EventId: "+record.getEventID());
                String data = new String(record.getKinesis().getData().array());
                logger.log("Data:"+ data);
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex) {
                logger.log("An error occurred:"+ex.getMessage());
                throw ex;
            }
        }
        logger.log("Successfully processed:"+event.getRecords().size()+" records");
        return null;
    }

}
```

------
#### [ JavaScript ]

**SDK für JavaScript (v3)**  
 Es gibt noch mehr dazu GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/blob/main/integration-kinesis-to-lambda). 
Ein Kinesis-Ereignis mit Lambda unter Verwendung von. JavaScript  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
  for (const record of event.Records) {
    try {
      console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      console.log(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      console.error(`An error occurred ${err}`);
      throw err;
    }
  }
  console.log(`Successfully processed ${event.Records.length} records.`);
};

async function getRecordDataAsync(payload) {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```
Ein Kinesis-Ereignis mit Lambda unter Verwendung von. TypeScript  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
  KinesisStreamEvent,
  Context,
  KinesisStreamHandler,
  KinesisStreamRecordPayload,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "kinesis-stream-handler-sample",
});

export const functionHandler: KinesisStreamHandler = async (
  event: KinesisStreamEvent,
  context: Context
): Promise<void> => {
  for (const record of event.Records) {
    try {
      logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      logger.info(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      logger.error(`An error occurred ${err}`);
      throw err;
    }
    logger.info(`Successfully processed ${event.Records.length} records.`);
  }
};

async function getRecordDataAsync(
  payload: KinesisStreamRecordPayload
): Promise<string> {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```

------
#### [ PHP ]

**SDK für PHP**  
 Es gibt noch mehr dazu. GitHub Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Nutzen eines Kinesis-Ereignisses mit Lambda unter Verwendung von PHP.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Kinesis\KinesisHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler extends KinesisHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handleKinesis(KinesisEvent $event, Context $context): void
    {
        $this->logger->info("Processing records");
        $records = $event->getRecords();
        foreach ($records as $record) {
            $data = $record->getData();
            $this->logger->info(json_encode($data));
            // TODO: Do interesting work based on the new data

            // Any exception thrown will be logged and the invocation will be marked as failed
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK für Python (Boto3)**  
 Es gibt noch mehr GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Nutzen eines Kinesis-Ereignisses mit Lambda unter Verwendung von Python.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import base64
def lambda_handler(event, context):

    for record in event['Records']:
        try:
            print(f"Processed Kinesis Event - EventID: {record['eventID']}")
            record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
            print(f"Record Data: {record_data}")
            # TODO: Do interesting work based on the new data
        except Exception as e:
            print(f"An error occurred {e}")
            raise e
    print(f"Successfully processed {len(event['Records'])} records.")
```

------
#### [ Ruby ]

**SDK für Ruby**  
 Es gibt noch mehr GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Nutzen eines Kinesis-Ereignisses mit Lambda unter Verwendung von Ruby.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'

def lambda_handler(event:, context:)
  event['Records'].each do |record|
    begin
      puts "Processed Kinesis Event - EventID: #{record['eventID']}"
      record_data = get_record_data_async(record['kinesis'])
      puts "Record Data: #{record_data}"
      # TODO: Do interesting work based on the new data
    rescue => err
      $stderr.puts "An error occurred #{err}"
      raise err
    end
  end
  puts "Successfully processed #{event['Records'].length} records."
end

def get_record_data_async(payload)
  data = Base64.decode64(payload['data']).force_encoding('UTF-8')
  # Placeholder for actual async work
  # You can use Ruby's asynchronous programming tools like async/await or fibers here.
  return data
end
```

------
#### [ Rust ]

**SDK für Rust**  
 Es gibt noch mehr dazu GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit [Serverless-Beispielen](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Nutzen eines Kinesis-Ereignisses mit Lambda unter Verwendung von Rust.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::event::kinesis::KinesisEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> {
    if event.payload.records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(());
    }

    event.payload.records.iter().for_each(|record| {
        tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default());

        let record_data = std::str::from_utf8(&record.kinesis.data);

        match record_data {
            Ok(data) => {
                // log the record data
                tracing::info!("Data: {}", data);
            }
            Err(e) => {
                tracing::error!("Error: {}", e);
            }
        }
    });

    tracing::info!(
        "Successfully processed {} records",
        event.payload.records.len()
    );

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

**So erstellen Sie die Funktion**

1. Erstellen Sie ein Verzeichnis für das Projekt und wechseln Sie dann zu diesem Verzeichnis.

   ```
   mkdir kinesis-tutorial
   cd kinesis-tutorial
   ```

1. Kopieren Sie den JavaScript Beispielcode in eine neue Datei mit dem Namen`index.js`.

1. Erstellen Sie ein Bereitstellungspaket.

   ```
   zip function.zip index.js
   ```

1. Erstellen Sie eine Lambda-Funktion mit dem Befehl `create-function`.

   ```
   aws lambda create-function --function-name ProcessKinesisRecords \
   --zip-file fileb://function.zip --handler index.handler --runtime nodejs24.x \
   --role arn:aws:iam::111122223333:role/lambda-kinesis-role
   ```

## Lambda-Funktion testen
<a name="walkthrough-kinesis-events-adminuser-create-test-function-upload-zip-test-manual-invoke"></a>

Rufen Sie Ihre Lambda-Funktion manuell mit dem `invoke` AWS Lambda CLI-Befehl und einem Kinesis-Beispielereignis auf.

**Lambda-Funktion testen**

1. Kopieren Sie das folgende JSON in eine Datei und speichern Sie sie unter `input.txt`. 

   ```
   {
       "Records": [
           {
               "kinesis": {
                   "kinesisSchemaVersion": "1.0",
                   "partitionKey": "1",
                   "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                   "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                   "approximateArrivalTimestamp": 1545084650.987
               },
               "eventSource": "aws:kinesis",
               "eventVersion": "1.0",
               "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
               "eventName": "aws:kinesis:record",
               "invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role",
               "awsRegion": "us-east-2",
               "eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream"
           }
       ]
   }
   ```

1. Verwenden Sie den `invoke`-Befehl, um das Ereignis an die Funktion zu senden.

   ```
   aws lambda invoke --function-name ProcessKinesisRecords \
   --cli-binary-format raw-in-base64-out \
   --payload file://input.txt outputfile.txt
   ```

   Die **cli-binary-format** Option ist erforderlich, wenn Sie Version 2 verwenden AWS CLI . Um dies zur Standardeinstellung zu machen, führen Sie `aws configure set cli-binary-format raw-in-base64-out` aus. Weitere Informationen finden Sie unter [Von AWS CLI unterstützte globale Befehlszeilenoptionen](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-options.html#cli-configure-options-list) im *AWS Command Line Interface -Benutzerhandbuch für Version 2*.

   Die Antwort wird in gespeicher `out.txt`.

## Kinesis-Stream erstellen
<a name="with-kinesis-example-configure-event-source-create"></a>

Verwenden Sie den Befehl `create-stream `, um einen Stream zu erstellen.

```
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
```

Führen Sie den folgenden `describe-stream`-Befehl aus, um den Stream-ARN zu erhalten.

```
aws kinesis describe-stream --stream-name lambda-stream
```

Die Ausgabe sollte folgendermaßen aussehen:

```
{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "340282366920746074317682119384634633455"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610"
                }
            }
        ],
        "StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream",
        "StreamName": "lambda-stream",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 24,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "KeyId": null,
        "StreamCreationTimestamp": 1544828156.0
    }
}
```

Sie benötigen im nächsten Schritt den Stream-ARN, um die Lambda-Funktion dem Stream zuzuordnen.

## Fügen Sie eine Ereignisquelle hinzu in AWS Lambda
<a name="with-kinesis-example-configure-event-source-add-event-source"></a>

Führen Sie den Befehl AWS CLI `add-event-source` aus.

```
aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \
--event-source  arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \
--batch-size 100 --starting-position LATEST
```

Notieren Sie die Zuweisungs-ID zur späteren Verwendung. Sie erhalten eine Liste der Zuweisungen von Ereignisquellen, indem Sie den `list-event-source-mappings`-Befehl ausführen.

```
aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream
```

In der Antwort können Sie überprüfen, ob der Statuswert is `enabled`. Ereignisquellen-Zuweisungen können deaktiviert werden, um Abfragen vorübergehend zu pausieren, ohne dass Datensätze verloren gehen.

## Testen der Einrichtung
<a name="with-kinesis-example-configure-event-source-test-end-to-end"></a>

Zum Testen der Ereignisquellen-Zuweisung fügen Sie Ihrem Kinesis-Stream Ereignisdatensätze hinzu. Der Wert `--data` ist eine Zeichenfolge, die die CLI vor dem Senden an Kinesis mit base64 verschlüsselt. Der gleiche Befehl kann mehrmals ausgeführt werden, um dem Stream mehrere Datensätze hinzuzufügen.

```
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \
--data "Hello, this is a test."
```

Lambda verwendet die Ausführungsrolle, um Datensätze aus dem Stream zu lesen. Anschließend wird Ihre Lambda-Funktion aufgerufen und Datensatzbatches werden übergeben. Die Funktion dekodiert Daten aus jedem Datensatz und protokolliert sie, wobei die Ausgabe an CloudWatch Logs gesendet wird. Zeigen Sie die Protokolle in der [CloudWatch -Konsole](https://console.aws.amazon.com/cloudwatch) an.

## Bereinigen Ihrer Ressourcen
<a name="cleanup"></a>

Sie können jetzt die Ressourcen, die Sie für dieses Tutorial erstellt haben, löschen, es sei denn, Sie möchten sie behalten. Durch das Löschen von AWS Ressourcen, die Sie nicht mehr verwenden, vermeiden Sie unnötige Kosten für Ihre AWS-Konto.

**So löschen Sie die Ausführungsrolle**

1. Öffnen Sie die Seite [Roles](https://console.aws.amazon.com/iam/home#/roles) in der IAM-Konsole.

1. Wählen Sie die von Ihnen erstellte Ausführungsrolle aus.

1. Wählen Sie **Löschen** aus.

1. Geben Sie den Namen der Rolle in das Texteingabefeld ein und wählen Sie **Delete** (Löschen) aus.

**So löschen Sie die Lambda-Funktion:**

1. Öffnen Sie die Seite [Funktionen](https://console.aws.amazon.com/lambda/home#/functions) der Lambda-Konsole.

1. Wählen Sie die Funktion aus, die Sie erstellt haben.

1. Wählen Sie **Aktionen**, **Löschen** aus.

1. Geben Sie **confirm** in das Texteingabefeld ein und wählen Sie **Delete** (Löschen) aus.

**So löschen Sie den Kinesis-Stream**

1. Melden Sie sich bei der an AWS-Managementkonsole und öffnen Sie die Kinesis-Konsole unter [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Wählen Sie den von Ihnen erstellten Stream aus

1. Wählen Sie **Aktionen**, **Löschen** aus.

1. Geben Sie **delete** in das Texteingabefeld ein.

1. Wählen Sie **Löschen** aus.