Capture de lots supprimés pour une source d’événement Apache Kafka autogérée - AWS Lambda

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Capture de lots supprimés pour une source d’événement Apache Kafka autogérée

Pour retenir les enregistrements des invocations de mappage de sources d’événements qui ont échoué, ajoutez une destination au mappage des sources d’événements de votre fonction. Chaque enregistrement envoyé à la destination est un document JSON contenant les métadonnées sur l’invocation ayant échoué. Pour les destinations Amazon S3, Lambda envoie également l’intégralité de l’enregistrement d’invocation avec les métadonnées. Vous pouvez configurer n’importe quelle rubrique Amazon SNS, n’importe quelle file d’attente Amazon SQS ou n’importe quel compartiment S3 comme destination.

Avec les destinations Amazon S3, vous pouvez utiliser la fonctionnalité Notifications d’événements Amazon S3 pour recevoir des notifications lorsque des objets sont chargés dans votre compartiment S3 de destination. Vous pouvez également configurer les notifications d’événements S3 pour invoquer une autre fonction Lambda afin d’effectuer un traitement automatique des lots ayant échoué.

Votre rôle d’exécution doit disposer d’autorisations pour la destination :

Vous devez déployer un point de terminaison de VPC pour votre service de destination en cas de panne au sein de votre VPC de cluster Apache Kafka.

En outre, si vous avez configuré une clé KMS sur votre destination, Lambda a besoin des autorisations suivantes en fonction du type de destination :

  • Si vous avez activé le chiffrement avec votre propre clé KMS pour une destination S3, KMS:GenerateDataKey est requis. Si la clé KMS et la destination du compartiment S3 se trouvent dans un compte différent de celui de votre fonction Lambda et de votre rôle d’exécution, configurez la clé KMS pour qu’elle approuve le rôle d’exécution afin d’autoriser KMS:GenerateDataKey.

  • Si vous avez activé le chiffrement avec votre propre clé KMS pour une destination SQS, kms:Decrypt et kms:GenerateDataKey sont requis. Si la clé KMS et la destination de la file d’attente SQS se trouvent dans un compte différent de votre fonction Lambda et de votre rôle d’exécution, configurez la clé KMS pour qu’elle fasse confiance au rôle d’exécution afin d’autoriser kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey et kms:ReEncrypt.

  • Si vous avez activé le chiffrement avec votre propre clé KMS pour une destination SNS, kms:Decrypt et kms:GenerateDataKey sont requis. Si la clé KMS et la destination de la rubrique SNS se trouvent dans un compte différent de votre fonction Lambda et de votre rôle d’exécution, configurez la clé KMS pour qu’elle fasse confiance au rôle d’exécution afin d’autoriser kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey et kms:ReEncrypt.

Configuration de destinations en cas de panne pour un mappage des sources d’événements Apache Kafka autogéré

Pour configurer une destination en cas de panne à l’aide de la console, procédez comme suit :

  1. Ouvrez la page Functions (Fonctions) de la console Lambda.

  2. Choisissez une fonction.

  3. Sous Function overview (Vue d’ensemble de la fonction), choisissez Add destination (Ajouter une destination).

  4. Pour Source, choisissez Invocation du mappage des sources d’événements.

  5. Pour le mappage des sources d’événements, choisissez une source d’événements configurée pour cette fonction.

  6. Pour Condition, sélectionnez En cas d’échec. Pour les invocations de mappage des sources d’événements, il s’agit de la seule condition acceptée.

  7. Pour Type de destination, choisissez le type de destination auquel Lambda envoie les enregistrements d’invocation.

  8. Pour Destination, choisissez une ressource.

  9. Choisissez Save (Enregistrer).

Vous pouvez également configurer une destination en cas de panne à l’aide de l’AWS CLI. Par exemple, la commande create-event-source-mapping suivante ajoute un mappage des sources d’événements avec une destination SQS en cas de panne à MyFunction :

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

La commande update-event-source-mapping ajoute une destination S3 en cas de panne à la source d’événements associée à l’entrée : uuid

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

Pour supprimer une destination, entrez une chaîne vide comme argument du paramètre destination-config :

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Pratiques exemplaires en matière de sécurité pour les destinations Amazon S3

La suppression d’un compartiment S3 configuré comme destination sans supprimer la destination de la configuration de votre fonction peut engendrer un risque de sécurité. Si un autre utilisateur connaît le nom de votre compartiment de destination, il peut recréer le compartiment dans son Compte AWS. Les enregistrements des invocations ayant échoué seront envoyés dans son compartiment, exposant potentiellement les données de votre fonction.

Avertissement

Pour vous assurer que les enregistrements d’invocation de votre fonction ne peuvent pas être envoyés vers un compartiment S3 d’un autre Compte AWS, ajoutez une condition au rôle d’exécution de votre fonction qui limite les autorisations s3:PutObject aux seuls compartiments de votre compte.

L’exemple suivant présente une politique IAM qui limite les autorisations s3:PutObject de votre fonction aux seuls compartiments de votre compte. Cette politique donne également à Lambda l’autorisation s3:ListBucket dont il a besoin pour utiliser un compartiment S3 comme destination.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3BucketResourceAccountWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:ListBucket" ], "Resource": "arn:aws:s3:::*/*", "Condition": { "StringEquals": { "s3:ResourceAccount": "111122223333" } } } ] }

Pour ajouter une politique d’autorisations au rôle d’exécution de votre fonction à l’aide de l’AWS Management Console ou de l’AWS CLI, reportez-vous aux instructions des procédures suivantes :

Console
Pour ajouter une politique d’autorisations au rôle d’exécution d’une fonction (console)
  1. Ouvrez la page Functions (Fonctions) de la console Lambda.

  2. Sélectionnez la fonction Lambda dont vous voulez modifier le rôle d’exécution.

  3. Sous l’onglet Configuration, sélectionnez Autorisations.

  4. Sous l’onglet Rôle d’exécution, sélectionnez le Nom du rôle de votre fonction pour ouvrir la page de console IAM du rôle.

  5. Ajoutez une politique d’autorisations de au rôle en procédant comme suit :

    1. Dans le volet Politiques d’autorisations, choisissez Ajouter des autorisations, puis Créer une politique en ligne.

    2. Dans l’Éditeur de politique, sélectionnez JSON.

    3. Collez la politique que vous souhaitez ajouter dans l’éditeur (en remplacement du JSON existant), puis choisissez Suivant.

    4. Sous Détails de la politique, saisissez un Nom de la politique.

    5. Choisissez Create Policy (Créer une politique).

AWS CLI
Pour ajouter une politique d’autorisations au rôle d’exécution d’une fonction (CLI)
  1. Créez un document de politique JSON avec les autorisations requises et enregistrez-le dans un répertoire local.

  2. Utilisez la commande put-role-policy de la CLI IAM pour ajouter des autorisations au rôle d’exécution de votre fonction. Exécutez la commande suivante depuis le répertoire dans lequel vous avez enregistré votre document de politique JSON et remplacez le nom du rôle, le nom de la politique et le document de politique par vos propres valeurs.

    aws iam put-role-policy \ --role-name my_lambda_role \ --policy-name LambdaS3DestinationPolicy \ --policy-document file://my_policy.json

Exemple d’enregistrement d’invocation SNS et SQS

L’exemple suivant montre ce que Lambda envoie à une rubrique SNS ou à une destination de file d’attente SQS pour une invocation de source d’événement Kafka qui a échoué. Chacune des clés sous recordsInfo contient à la fois le sujet et la partition Kafka, séparés par un trait d’union. Par exemple, pour la clé"Topic-0", Topic est la rubrique Kafka, et 0 est la partition. Pour chaque sujet et chaque partition, vous pouvez utiliser les décalages et les données d’horodatage pour trouver les enregistrements d’invocation d’origine.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

Exemple d’enregistrement d’invocation de destination S3

Pour les destinations S3, Lambda envoie l’intégralité de l’enregistrement d’invocation ainsi que les métadonnées à la destination. L’exemple suivant montre que Lambda envoie vers une destination de compartiment S3 en cas d’échec d’une invocation de source d’événement Kafka. Outre tous les champs de l’exemple précédent pour les destinations SQS et SNS, le champ payload contient l’enregistrement d’invocation d’origine sous forme de chaîne JSON échappée.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
Astuce

Nous vous recommandons d’activer la gestion des versions S3 sur votre compartiment de destination.