Traitement des messages Apache Kafka autogérés avec Lambda - AWS Lambda

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Traitement des messages Apache Kafka autogérés avec Lambda

Note

Si vous souhaitez envoyer des données à une cible autre qu'une fonction Lambda ou enrichir les données avant de les envoyer, consultez Amazon EventBridge Pipes.

Ajout d’un cluster Kafka en tant que source d’événement

Pour créer un mappage de source d’événement, ajoutez votre cluster Kafka en tant que déclencheur de fonction Lambda à l’aide de la console Lambda, d’un kit SDK AWS, ou de l’AWS Command Line Interface (AWS CLI).

Cette section explique comment créer un mappage de source d’événement à l’aide de la console Lambda et de l’ AWS CLI.

Prérequis

  • Cluster Apache Kafka autogéré. Lambda prend en charge Apache Kafka versions 0.10.1.0 et ultérieures.

  • Rôle d'exécution autorisé à accéder aux AWS ressources utilisées par votre cluster Kafka autogéré.

Identifiant de groupe de consommateurs personnalisable

Lorsque vous configurez Kafka comme source d’événements, vous pouvez spécifier un identifiant de groupe de consommateurs. Cet identifiant de groupe de consommateurs est un identifiant existant pour le groupe de clients Kafka auquel vous souhaitez rattacher votre fonction Lambda. Vous pouvez utiliser cette fonction pour migrer facilement toutes les configurations de traitement d’enregistrements Kafka en cours depuis d’autres clients vers Lambda.

Si vous spécifiez un identifiant de groupe de consommateurs et qu’il existe d’autres sondeurs actifs au sein de ce groupe de consommateurs, Kafka distribue des messages à tous les consommateurs. En d’autres termes, Lambda ne reçoit pas l’intégralité du message relatif au sujet Kafka. Si vous souhaitez que Lambda gère tous les messages de la rubrique, désactivez tous les autres sondeurs de ce groupe de consommateurs.

De plus, si vous spécifiez un identifiant de groupe de consommateurs et que Kafka trouve un groupe de consommateurs existant valide avec le même identifiant, Lambda ignore le paramètre StartingPosition pour le mappage des sources d’événements. Lambda commence plutôt à traiter les enregistrements en fonction de la compensation engagée par le groupe de consommateurs. Si vous spécifiez un identifiant de groupe de consommateurs et que Kafka ne trouve aucun groupe de consommateurs existant, Lambda configure votre source d’événement avec le StartingPosition spécifié.

L’identifiant du groupe de consommateurs que vous spécifiez doit être unique parmi toutes vos sources d’événements Kafka. Après avoir créé un mappage de sources d’événements Kafka avec l’identifiant de groupe de consommateurs spécifié, vous ne pouvez plus mettre à jour cette valeur.

Ajout d’un cluster Kafka autogéré (console)

Pour ajouter votre cluster Apache Kafka autogéré et une rubrique Kafka en tant que déclencheur pour votre fonction Lambda, procédez comme suit.

Pour ajouter un déclencheur Apache Kafka à votre fonction Lambda (console)
  1. Ouvrez la page Functions (Fonctions) de la console Lambda.

  2. Choisissez le nom de votre fonction Lambda.

  3. Sous Function overview (Vue d’ensemble de la fonction), choisissez Add trigger (Ajouter un déclencheur).

  4. Sous Trigger configuration (Configuration du déclencheur), procédez comme suit :

    1. Choisissez le type de déclencheur Apache Kafka.

    2. Pour Bootstrap servers (Serveurs d’amorçage), entrez l’adresse de paire hôte et port d’un agent Kafka dans votre cluster, puis choisissez Add (Ajouter). Répétez l’opération pour chaque agent Kafka dans le cluster.

    3. Pour Topic name (Nom de rubrique), entrez le nom de la rubrique Kafka utilisée pour stocker les registres dans le cluster.

    4. (Facultatif) Pour Batch size (Taille de lot), entrez le nombre maximal de registres à recevoir dans un même lot.

    5. Pour Batch window, veuillez saisir l’intervalle de temps maximal en secondes nécessaire à Lambda pour collecter des enregistrements avant d’invoquer la fonction.

    6. (Facultatif) Pour l’identifiant de groupe de consommateurs, entrez l’identifiant d’un groupe de consommateurs Kafka à rejoindre.

    7. (Facultatif) Pour Position de départ, choisissez Dernier pour commencer à lire le flux à partir du dernier enregistrement, Supprimer l’horizon pour commencer au premier enregistrement disponible ou À l’horodatage pour spécifier un horodatage à partir duquel commencer la lecture.

    8. (Facultatif) Pour VPC, choisissez l’Amazon VPC pour votre cluster Kafka. Ensuite, choisissez le VPC subnets (Sous-réseaux VPC) et les VPC security groups (Groupes de sécurité VPC).

      Ce paramètre est requis si seuls des utilisateurs au sein de votre VPC accèdent à vos courtiers.

    9. (Facultatif) Pour Authentication (Authentification), choisissez Add (Ajouter), puis procédez comme suit :

      1. Choisissez le protocole d’accès ou d’authentification des courtiers Kafka dans votre cluster.

        • Si votre agent Kafka utilise l’authentification SASL/PLAIN, choisissez BASIC_AUTH.

        • Si votre courtier utilise l'authentification SASL/SCRAM, choisissez l'un des protocoles. SASL_SCRAM

        • Si vous configurez l’authentification mTLS, choisissez le protocole CLIENT_CERTIFICATE_TLS_AUTH.

      2. Pour l’authentification SASL/SCRAM ou mTLS, choisissez le nom de la clé secrète Secrets Manager contenant les informations d’identification de votre cluster Kafka.

    10. (Facultatif) Pour Encryption (Chiffrement), choisissez le secret Secrets Manager contenant le certificat d’autorité de certification racine que vos courtiers Kafka utilisent pour le chiffrement TLS, si vos courtiers Kafka utilisent des certificats signés par une autorité de certification privée.

      Ce paramètre s'applique au chiffrement TLS pour et à SASL/SCRAM or SASL/PLAIN l'authentification mTLS.

    11. Pour créer le déclencheur dans un état désactivé pour le test (recommandé), désactivez Enable trigger (Activer le déclencheur). Ou, pour activer le déclencheur immédiatement, sélectionnezActiver un déclencheur.

  5. Pour créer le déclencheur, choisissez Add (Ajouter).

Ajout d’un cluster Kafka autogéré (AWS CLI)

Utilisez les exemples de AWS CLI commandes suivants pour créer et afficher un déclencheur Apache Kafka autogéré pour votre fonction Lambda.

Utilisation de SASL/SCRAM

Si des utilisateurs de Kafka accèdent à vos courtiers Kafka via Internet, spécifiez le secret Secrets Manager que vous avez créé pour l’authentification SASL/SCRAM. L'exemple suivant utilise la create-event-source-mapping AWS CLI commande pour mapper une fonction Lambda nommée my-kafka-function à une rubrique Kafka nommée. AWSKafkaTopic

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

Utilisation d’un VPC

Si seuls des utilisateurs de Kafka au sein de votre VPC accèdent à vos agents Kafka, vous devez spécifier votre VPC, vos sous-réseaux et votre groupe de sécurité de VPC. L'exemple suivant utilise la create-event-source-mapping AWS CLI commande pour mapper une fonction Lambda nommée my-kafka-function à une rubrique Kafka nommée. AWSKafkaTopic

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

Affichage de l'état à l'aide du AWS CLI

L'exemple suivant utilise la get-event-source-mapping AWS CLI commande pour décrire l'état du mappage des sources d'événements que vous avez créé.

aws lambda get-event-source-mapping --uuid dh38738e-992b-343a-1077-3478934hjkfd7

Paramètres de configuration Apache Kafka autogérés

Tous les types de sources d'événements Lambda partagent les mêmes opérations CreateEventSourceMappinget les mêmes opérations d'UpdateEventSourceMappingAPI. Cependant, seuls certains paramètres s’appliquent à Apache Kafka.

Paramètre Obligatoire Par défaut Remarques

BatchSize

N

100

Maximum : 10 000.

DestinationConfig

N

N/A

Capture de lots supprimés pour une source d’événement Apache Kafka autogérée

Activées

N

True

FilterCriteria

N

N/A

Contrôle des événements envoyés par Lambda à votre fonction

FunctionName

Y

N/A

KMSKeyArn

N

N/A

Chiffrement des critères de filtre

MaximumBatchingWindowInSeconds

N

500 ms

Comportement de traitement par lots

ProvisionedPollersConfig

N

MinimumPollers : la valeur par défaut, si elle n’est pas spécifiée, est de 1

MaximumPollers : la valeur par défaut, si elle n’est pas spécifiée, est de 200

Configuration du mode alloué

SelfManagedEventSource

Y

N/A

Liste des agents Kafka. Peut définir uniquement sur Create (Créer)

SelfManagedKafkaEventSourceConfig

N

Contient le ConsumerGroupId champ qui prend par défaut une valeur unique.

Peut définir uniquement sur Create (Créer)

SourceAccessConfigurations

N

Pas d’informations d’identification

Informations sur le VPC ou informations d’authentification pour le cluster

Pour SASL_PLAIN, défini sur BASIC_AUTH

StartingPosition

Y

N/A

AT_TIMESTAMP, TRIM_HORIZON ou DERNIER

Peut définir uniquement sur Create (Créer)

StartingPositionTimestamp

N

N/A

Obligatoire s'il StartingPosition est défini sur AT_TIMESTAMP

Balises

N

N/A

Utilisation des balises dans les mappages des sources d’événements

Rubriques

Y

N/A

Nom de la rubrique

Peut définir uniquement sur Create (Créer)

Utilisation d’un cluster Kafka en tant que source d’événement

Lorsque vous ajoutez votre cluster Apache Kafka ou Amazon MSK comme déclencheur pour votre fonction Lambda, le cluster est utilisé comme source d’événement.

Lambda lit les données d'événements des sujets Kafka que vous spécifiez Topics dans une CreateEventSourceMappingdemande, en fonction de StartingPosition ce que vous spécifiez. Lorsque le traitement a réussi, votre rubrique Kafka est validée dans votre cluster Kafka.

Si vous spécifiez StartingPosition comme LATEST, Lambda commence à lire à partir du dernier message dans chaque partition de la rubrique. Un certain temps pouvant s’écouler après la configuration du déclencheur avant que Lambda commence à lire les messages, Lambda ne lit aucun message produit pendant cette fenêtre de temps.

Lambda traite les registres d’une ou plusieurs partitions de rubrique Kafka que vous spécifiez et envoie une charge utile JSON à votre fonction Lambda. Une seule charge utile Lambda peut contenir des messages provenant de plusieurs partitions. Lorsque d'autres enregistrements sont disponibles, Lambda continue de traiter les enregistrements par lots, en fonction de la BatchSize valeur que vous spécifiez dans une CreateEventSourceMappingdemande, jusqu'à ce que votre fonction aborde le sujet.

Si votre fonction renvoie une erreur pour l’un des messages d’un lot, Lambda réessaie le lot de messages complet jusqu’à ce que le traitement réussisse ou que les messages expirent. Vous pouvez envoyer les enregistrements qui échouent à toutes les tentatives vers une destination en cas de panne pour un traitement ultérieur.

Note

Alors que les fonctions Lambda ont généralement un délai d’expiration maximal de 15 minutes, les mappages des sources d’événement pour Amazon MSK, Apache Kafka autogéré, Amazon DocumentDB et Amazon MQ pour ActiveMQ et RabbitMQ ne prennent en charge que les fonctions dont le délai d’expiration maximal est de 14 minutes. Cette contrainte garantit que le mappage des sources d’événements peut gérer correctement les erreurs de fonction et effectuer de nouvelles tentatives.

Positions de départ des interrogations et des flux

Sachez que l’interrogation des flux lors des mises à jour et de la création du mappage des sources d’événements est finalement cohérente.

  • Lors de la création du mappage des sources d’événements, le démarrage de l’interrogation des événements depuis le flux peut prendre plusieurs minutes.

  • Lors des mises à jour du mappage des sources d’événements, l’arrêt et le redémarrage de l’interrogation des événements depuis le flux peuvent prendre plusieurs minutes.

Ce comportement signifie que si vous spécifiez LATEST comme position de départ du flux, le mappage des sources d’événements peut manquer des événements lors de la création ou des mises à jour. Pour vous assurer de ne manquer aucun événement, spécifiez la position de départ du flux comme TRIM_HORIZON ou AT_TIMESTAMP.

Comportement de mise à l’échelle du débit des messages pour les mappages de sources d’événement Apache Kafka autogérés

Vous pouvez choisir entre deux modes de comportement de mise à l’échelle du débit des messages pour le mappage des sources d’événements Amazon MSK :

Mode par défaut (à la demande)

Lorsque vous créez initialement une source d’événement Apache Kafka autogérée, Lambda alloue un nombre de sondeurs d’événements par défaut pour traiter toutes les partitions de la rubrique Kafka. Lambda augmente ou diminue automatiquement le nombre de sondeurs d’événements, en fonction de la charge de messages.

Toutes les minutes, Lambda évalue le décalage de consommateurs de toutes les partitions dans la rubrique. Si le décalage est trop élevé, la partition reçoit des messages plus rapidement que Lambda ne peut les traiter. Si nécessaire, Lambda ajoute ou supprime des sondeurs d’événements dans la rubrique. Cette mise à l’échelle automatique consistant à ajouter ou à supprimer des sondeurs d’événements a lieu dans les trois minutes suivant l’évaluation.

Si votre fonction Lambda cible est limitée, Lambda réduit le nombre de sondeurs d’événements. Cette action réduit la charge de travail de la fonction en diminuant le nombre de messages que les sondeurs d’événements peuvent échanger avec la fonction.

Pour surveiller le débit de votre rubrique Kafka, vous pouvez afficher les métriques de consommateurs Apache Kafka, telles que consumer_lag et consumer_offset.

Configuration du mode alloué

Pour les charges de travail où vous devez optimiser le débit de votre mappage des sources d’événements, vous pouvez utiliser le mode provisionné. En mode alloué, vous définissez des limites minimales et maximales pour le nombre de sondeurs d’événements alloués. Ces sondeurs d’événements alloués sont dédiés à votre mappage des sources d’événements et peuvent gérer les pics de messages inattendus de manière instantanée lorsqu’ils se produisent. Nous vous recommandons d’utiliser le mode alloué pour les charges de travail Kafka soumises à des exigences de performance strictes.

Dans Lambda, un sondeur d'événements est une unité de calcul capable de gérer jusqu'à 5 MBps  % du débit. À titre de référence, supposons que votre source d’événement produise des données utiles moyennes de 1 Mo et que la durée d’exécution moyenne des fonctions soit de 1 seconde. Si la charge utile ne subit aucune transformation (telle que le filtrage), un seul interrogateur peut prendre en charge 5 MBps débits et 5 appels Lambda simultanés. L’utilisation du mode alloué génère des coûts supplémentaires. Pour les estimations de prix, consultez la Tarification d’AWS Lambda.

En mode alloué, la plage de valeurs acceptées pour le nombre minimal de sondeurs d’événements (MinimumPollers) est comprise entre 1 et 200 inclus. La plage de valeurs acceptées pour le nombre maximal de sondeurs d’événements (MaximumPollers) est comprise entre 1 et 2 000 inclus. MaximumPollers doit être supérieur ou égal à MinimumPollers. En outre, pour maintenir un traitement ordonné au sein des partitions, Lambda limite le nombre de MaximumPollers au nombre de partitions dans la rubrique.

Pour plus de détails sur le choix des valeurs appropriées pour le nombre minimal et maximal de sondeurs d’événements, consultez Bonnes pratiques et considérations lors de l’utilisation du mode provisionné.

Vous pouvez configurer le mode alloué pour le mappage des sources d’événements Apache Kafka autogéré à l’aide de la console ou de l’API Lambda.

Pour configurer le mode alloué pour un mappage des sources d’événements Apache Kafka autogéré existant (console)
  1. Ouvrez la page Functions (Fonctions) de la console Lambda.

  2. Choisissez la fonction avec le mappage des sources d’événements Apache Kafka autogéré pour laquelle vous souhaitez configurer le mode alloué.

  3. Choisissez Configuration, puis Déclencheurs.

  4. Choisissez le mappage des sources d’événements Apache Kafka autogéré pour lequel vous souhaitez configurer le mode alloué, puis choisissez Modifier.

  5. Sous Configuration du mappage des sources d’événements, choisissez Configurer le mode provisionné.

    • Pour le Nombre minimal de sondeurs d’événements, saisissez une valeur comprise entre 1 et 200. Si vous ne spécifiez aucune valeur, Lambda choisit la valeur par défaut 1.

    • Pour le Nombre maximal de sondeurs d’événements, saisissez une valeur comprise entre 1 et 2 000. Cette valeur doit être supérieure ou égale à la valeur du Nombre minimal de sondeurs d’événements. Si vous ne spécifiez aucune valeur, Lambda choisit la valeur par défaut 200.

  6. Choisissez Save (Enregistrer).

Vous pouvez configurer le mode provisionné par programmation à l'aide de l'ProvisionedPollerConfigobjet de votre. EventSourceMappingConfiguration Par exemple, la commande UpdateEventSourceMappingCLI suivante configure une MinimumPollers valeur de 5 et une MaximumPollers valeur de 100.

aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --provisioned-poller-config '{"MinimumPollers": 5, "MaximumPollers": 100}'

Après avoir configuré le mode alloué, vous pouvez observer l’utilisation des sondeurs d’événements pour votre charge de travail en surveillant la métrique ProvisionedPollers. Pour de plus amples informations, veuillez consulter Métriques de mappage des sources d’événements.

Pour désactiver le mode provisionné et revenir au mode par défaut (à la demande), vous pouvez utiliser la commande UpdateEventSourceMappingCLI suivante :

aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --provisioned-poller-config '{}'

Bonnes pratiques et considérations lors de l’utilisation du mode provisionné

La configuration optimale du nombre minimal et maximal de sondeurs d’événements pour votre mappage des sources d’événements dépend des exigences de performances de votre application. Nous vous recommandons de commencer avec le nombre minimal de sondeurs d’événéments par défaut afin de définir le profil de performances. Ajustez votre configuration en fonction des modèles de traitement des messages observés et du profil de performances souhaité.

Pour les charges de travail associées à des pics de trafic et à des exigences de performances strictes, augmentez le nombre minimal de sondeurs d’événements de manière à gérer les pics soudains de messages. Pour déterminer le nombre minimal de sondeurs d'événements requis, prenez en compte le nombre de messages par seconde de votre charge de travail et la taille moyenne de la charge utile, et utilisez la capacité de débit d'un seul sondeur d'événements (jusqu'à 5 MBps) comme référence.

Pour maintenir un traitement ordonné au sein d’une partition, Lambda limite le nombre maximal de sondeurs d’événements au nombre de partitions dans la rubrique. En outre, le nombre maximal de sondeurs d’événements auxquels votre mappage des sources d’événements peut être mis à l’échelle dépend des paramètres de simultanéité de la fonction.

Lorsque vous activez le mode provisionné, mettez à jour vos paramètres réseau pour supprimer les points de terminaison AWS PrivateLink VPC et les autorisations associées.

CloudWatch Métriques Amazon

Lambda émet la métrique OffsetLag pendant que votre fonction traite les registres. La valeur de cette métrique est la différence de décalage entre le dernier enregistrement inscrit dans la rubrique source de l’événement Kafka et le dernier enregistrement traité par le groupe de consommateurs de votre fonction. Vous pouvez utiliser OffsetLag pour estimer la latence entre le moment où un enregistrement est ajouté et celui où votre groupe de consommateurs le traite.

Une tendance à la hausse de OffsetLag peut indiquer des problèmes liés aux sondeurs dans le groupe de consommateurs de votre fonction. Pour de plus amples informations, veuillez consulter Utilisation de CloudWatch métriques avec Lambda.