Erfassung verworfener Batches für eine selbstverwaltete Apache Kafka-Ereignisquelle - AWS Lambda

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Erfassung verworfener Batches für eine selbstverwaltete Apache Kafka-Ereignisquelle

Um Datensätze zu fehlgeschlagenen Aufrufen zur Zuordnung von Ereignisquellen beizubehalten, fügen Sie der Zuordnung von Ereignisquellen Ihrer Funktion ein Ziel hinzu. Jeder Datensatz, der an das Ziel gesendet wird, ist ein JSON Dokument mit Metadaten über den fehlgeschlagenen Aufruf. Sie können jedes SNS Amazon-Thema, jede SQS Amazon-Warteschlange oder jeden S3-Bucket als Ziel konfigurieren. Ihre Ausführungsrolle muss über Berechtigungen für das Ziel verfügen:

Sie müssen einen VPC Endpunkt für Ihren Zieldienst bei einem Ausfall in Ihrem Apache Kafka-Cluster bereitstellen. VPC

Wenn Sie einen KMS Schlüssel für Ihr Ziel konfiguriert haben, benötigt Lambda außerdem je nach Zieltyp die folgenden Berechtigungen:

Konfiguration von Zielen bei einem Ausfall für eine selbstverwaltete Apache Kafka-Ereignisquellenzuordnung

Gehen Sie folgendermaßen vor, um ein Ausfallziel mit der Konsole zu konfigurieren:

  1. Öffnen Sie die Seite Funktionen der Lambda-Konsole.

  2. Wählen Sie eine Funktion aus.

  3. Wählen Sie unter Function overview (Funktionsübersicht) die Option Add destination (Ziel hinzufügen).

  4. Wählen Sie als Quelle die Option Aufruf der Zuordnung von Ereignisquellen aus.

  5. Wählen Sie für die Zuordnung von Ereignisquellen eine Ereignisquelle aus, die für diese Funktion konfiguriert ist.

  6. Wählen Sie für Bedingung die Option Bei Ausfall aus. Für Aufrufe zur Zuordnung von Ereignisquellen ist dies die einzig akzeptierte Bedingung.

  7. Wählen Sie unter Zieltyp den Zieltyp aus, an den Lambda Aufrufdatensätze sendet.

  8. Wählen Sie unter Destination (Ziel) eine Ressource aus.

  9. Wählen Sie Save (Speichern) aus.

Sie können auch ein Ziel für den Fall eines Fehlers konfigurieren, indem Sie AWS CLI Mit dem folgenden create-event-source-mappingBefehl wird beispielsweise eine Ereignisquellenzuordnung mit einem Ziel für den Fall eines SQS Fehlers hinzugefügt: MyFunction

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

Mit dem folgenden update-event-source-mappingBefehl wird der mit der Eingabe verknüpften Ereignisquelle ein S3-Ziel für den Fall eines Fehlers hinzugefügt: uuid

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

Um ein Ziel zu entfernen, geben Sie eine leere Zeichenfolge als Argument für den destination-config-Parameter an:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

SNSund ein SQS Beispiel für einen Aufrufdatensatz

Das folgende Beispiel zeigt, was Lambda bei einem fehlgeschlagenen Kafka-Ereignisquellenaufruf an ein SNS Thema oder ein SQS Warteschlangenziel sendet. Jeder der Schlüssel unter recordsInfo enthält sowohl das Kafka-Thema als auch die Kafka-Partition, getrennt durch einen Bindestrich. Bei dem Schlüssel "Topic-0" handelt es sich beispielsweise bei Topic um das Kafka-Thema und bei 0 um die Partition. Für jedes Thema und jede Partition können Sie die Offsets und Zeitstempeldaten verwenden, um die ursprünglichen Aufrufdatensätze zu finden.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

Beispiel für einen S3-Zielaufrufdatensatz

Für S3-Ziele sendet Lambda den gesamten Aufrufdatensatz zusammen mit den Metadaten an das Ziel. Das folgende Beispiel zeigt, was Lambda bei einem fehlgeschlagenen Aufruf der Kafka-Ereignisquelle an ein S3-Bucket-Ziel sendet. Zusätzlich zu allen Feldern aus dem vorherigen Beispiel für SQS und SNS Ziele enthält das payload Feld den ursprünglichen Aufrufdatensatz als JSON Escape-Zeichenfolge.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
Tipp

Wir empfehlen außerdem, die S3-Versionsverwaltung in Ihrem Ziel-Bucket zu aktivieren.