Capture des lots rejetés pour une source d'événements Apache Kafka autogérée - AWS Lambda

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Capture des lots rejetés pour une source d'événements Apache Kafka autogérée

Pour retenir les enregistrements des invocations de mappage de sources d’événements qui ont échoué, ajoutez une destination au mappage des sources d’événements de votre fonction. Chaque enregistrement envoyé à la destination est un JSON document contenant des métadonnées relatives à l'échec de l'invocation. Vous pouvez configurer n'importe quel SNS sujet Amazon, SQS file d'attente Amazon ou compartiment S3 comme destination. Votre rôle d'exécution doit disposer d'autorisations pour la destination :

Vous devez déployer un VPC point de terminaison pour votre service de destination en cas de défaillance dans votre cluster Apache Kafka. VPC

En outre, si vous avez configuré une KMS clé sur votre destination, Lambda a besoin des autorisations suivantes en fonction du type de destination :

Configuration de destinations en cas de panne pour un mappage de sources d'événements Apache Kafka autogéré

Pour configurer une destination en cas de panne à l’aide de la console, procédez comme suit :

  1. Ouvrez la page Functions (Fonctions) de la console Lambda.

  2. Choisissez une fonction.

  3. Sous Function overview (Vue d’ensemble de la fonction), choisissez Add destination (Ajouter une destination).

  4. Pour Source, choisissez Invocation du mappage des sources d’événements.

  5. Pour le mappage des sources d’événements, choisissez une source d’événements configurée pour cette fonction.

  6. Pour Condition, sélectionnez En cas d’échec. Pour les invocations de mappage des sources d’événements, il s’agit de la seule condition acceptée.

  7. Pour Type de destination, choisissez le type de destination auquel Lambda envoie les enregistrements d’invocation.

  8. Pour Destination, choisissez une ressource.

  9. Choisissez Save (Enregistrer).

Vous pouvez également configurer une destination en cas de panne à l'aide du AWS CLI. Par exemple, la create-event-source-mappingcommande suivante ajoute un mappage de source d'événement avec une destination SQS en cas de défaillance à MyFunction :

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

La update-event-source-mappingcommande suivante ajoute une destination S3 en cas de défaillance à la source d'événements associée à l'entrée uuid :

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

Pour supprimer une destination, entrez une chaîne vide comme argument du paramètre destination-config :

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

SNSet SQS exemple d'enregistrement d'invocation

L'exemple suivant montre ce que Lambda envoie à un SNS sujet ou à une destination de SQS file d'attente en cas d'échec d'un appel de source d'événement Kafka. Chacune des clés sous recordsInfo contient à la fois le sujet et la partition Kafka, séparés par un trait d’union. Par exemple, pour la clé"Topic-0", Topic est la rubrique Kafka, et 0 est la partition. Pour chaque sujet et chaque partition, vous pouvez utiliser les décalages et les données d’horodatage pour trouver les enregistrements d’invocation d’origine.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

Exemple d'enregistrement d'appel de destination S3

Pour les destinations S3, Lambda envoie l’intégralité de l’enregistrement d’invocation ainsi que les métadonnées à la destination. L’exemple suivant montre que Lambda envoie vers une destination de compartiment S3 en cas d’échec d’une invocation de source d’événement Kafka. Outre tous les champs de l'exemple précédent pour SQS et pour les SNS destinations, le payload champ contient l'enregistrement d'appel d'origine sous forme de JSON chaîne échappée.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
Astuce

Nous vous recommandons d’activer la gestion des versions S3 sur votre compartiment de destination.