Tutoriel : Utilisation d’un mappage des sources d’événements Amazon MSK pour invoquer une fonction Lambda - AWS Lambda

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Tutoriel : Utilisation d’un mappage des sources d’événements Amazon MSK pour invoquer une fonction Lambda

Dans ce tutoriel, vous exécuterez les étapes suivantes :

  • Créez une fonction Lambda dans le même AWS compte qu'un cluster Amazon MSK existant.

  • Configurer le réseau et l’authentification pour que Lambda communique avec Amazon MSK.

  • Configurer un mappage des sources d’événements Amazon MSK Lambda, qui exécute votre fonction Lambda lorsque des événements apparaissent dans la rubrique.

Une fois ces étapes terminées, vous pouvez configurer une fonction Lambda pour traiter automatiquement les événements envoyés à Amazon MSK avec votre code Lambda personnalisé.

Que pouvez-vous faire avec cette fonctionnalité ?

Exemple de solution : Utiliser un mappage des sources d’événements MSK pour fournir des résultats en direct à vos clients.

Imaginons le scénario suivant : votre entreprise héberge une application Web dans laquelle vos clients peuvent consulter des informations sur des événements en direct, tels que des matchs de sport. Les informations actualisées du jeu sont fournies à votre équipe via une rubrique Kafka sur Amazon MSK. Vous souhaitez concevoir une solution qui utilise les mises à jour issues de la rubrique MSK afin de fournir une vue actualisée de l’événement en direct aux clients au sein d’une application que vous développez. Vous avez opté pour l’approche de conception suivante : vos applications clientes communiqueront avec un dorsal sans serveur hébergé dans AWS. Les clients se connecteront via des sessions websocket à l'aide de l'API Amazon WebSocket API Gateway.

Dans cette solution, vous avez besoin d’un composant qui lit les événements MSK, exécute une logique personnalisée pour préparer ces événements pour la couche application, puis transmet ces informations à l’API API Gateway. Vous pouvez implémenter ce composant en fournissant votre logique personnalisée dans une fonction Lambda, puis en l'appelant à l'aide d'un mappage de source d'événements AWS Lambda Amazon MSK. AWS Lambda

Pour plus d'informations sur la mise en œuvre de solutions à l'aide de l'API Amazon WebSocket API Gateway, consultez les WebSocket didacticiels sur les API dans la documentation d'API Gateway.

Prérequis

Un AWS compte avec les ressources préconfigurées suivantes :

Pour remplir ces prérequis, nous vous recommandons de suivre Get started using Amazon MSK dans la documentation Amazon MSK.

  • Un cluster Amazon MSK. Consultez Create an Amazon MSK cluster dans Getting started using Amazon MSK.

  • La configuration suivante :

    • Assurez-vous que l’authentification basée sur les rôles IAM est Activée dans les paramètres de sécurité de votre cluster. Cela améliore votre sécurité en limitant votre fonction Lambda à l’accès aux ressources Amazon MSK nécessaires uniquement. L’authentification basée sur les rôles IAM est activée par défaut sur les nouveaux clusters Amazon MSK.

    • Assurez-vous que l’Accès public est désactivé dans les paramètres réseau de votre cluster. Restreindre l’accès à Internet de votre cluster Amazon MSK améliore votre sécurité en limitant le nombre d’intermédiaires qui traitent vos données. L’accès public est activé par défaut sur les nouveaux clusters Amazon MSK.

  • Une rubrique Kafka dans votre cluster Amazon MSK à utiliser pour cette solution. Consultez Create a topic dans Getting started using Amazon MSK.

  • Un hôte administrateur Kafka configuré pour récupérer les informations de votre cluster Kafka et envoyer des événements Kafka à votre sujet à des fins de test, par exemple une EC2 instance Amazon sur laquelle la CLI d'administration Kafka et la bibliothèque Amazon MSK IAM sont installées. Consultez Create a client machine dans Getting started using Amazon MSK.

Une fois que vous avez configuré ces ressources, collectez les informations suivantes à partir de votre AWS compte pour confirmer que vous êtes prêt à continuer.

  • Le nom de votre cluster Amazon MSK. Vous pouvez trouver cette information dans la console Amazon MSK.

  • L’UUID du cluster, qui fait partie de l’ARN de votre cluster Amazon MSK, que vous pouvez trouver dans la console Amazon MSK. Suivez les procédures décrites dans la rubrique Listing clusters de la documentation Amazon MSK pour trouver cette information.

  • Les groupes de sécurité associés à votre cluster Amazon MSK. Vous pouvez trouver cette information dans la console Amazon MSK. Dans les étapes suivantes, appelez-les vosclusterSecurityGroups.

  • L’ID du VPC Amazon contenant votre cluster Amazon MSK. Vous pouvez trouver cette information en identifiant les sous-réseaux associés à votre cluster Amazon MSK dans la console Amazon MSK, puis en identifiant le VPC Amazon associé au sous-réseau dans la console Amazon VPC.

  • Le nom de la rubrique Kafka utilisée dans votre solution. Vous pouvez trouver cette information en appelant votre cluster Amazon MSK à l’aide de la CLI topics Kafka depuis votre hôte administrateur Kafka. Pour plus d’informations sur la CLI de rubriques, consultez la section Adding and removing topics dans la documentation Kafka.

  • Le nom d’un groupe de consommateurs pour votre rubrique Kafka, adapté à une utilisation par votre fonction Lambda. Ce groupe peut être créé automatiquement par Lambda. Vous n’avez donc pas besoin de le créer avec la CLI Kafka. Si vous devez gérer vos groupes de consommateurs, pour en savoir plus sur la CLI de groupes de consommateurs, consultez la rubrique Managing Consumer Groups dans la documentation Kafka.

Les autorisations suivantes dans votre AWS compte :

  • L’autorisation de créer et de gérer une fonction Lambda.

  • L’autorisation de créer des politiques IAM et de les associer à votre fonction Lambda.

  • L’autorisation de créer des points de terminaison de VPC Amazon et de modifier la configuration réseau dans le VPC Amazon hébergeant votre cluster Amazon MSK.

Configurer la connectivité réseau pour que Lambda communique avec Amazon MSK

AWS PrivateLink À utiliser pour connecter Lambda et Amazon MSK. Pour ce faire, vous créez des points de terminaison de VPC Amazon d’interface dans la console Amazon VPC. Pour plus d’informations sur la configuration réseau, consultez Configurer la sécurité réseau.

Lorsqu’un mappage des sources d’événements Amazon MSK s’exécute pour le compte d’une fonction Lambda, il endosse le rôle d’exécution de la fonction Lambda. Ce rôle IAM autorise le mappage pour accéder aux ressources sécurisées par IAM, telles que votre cluster Amazon MSK. Bien que les composants partagent un rôle d’exécution, le mappage Amazon MSK et votre fonction Lambda ont des exigences de connectivité distinctes pour leurs tâches respectives, comme le montre le schéma suivant.

Une fonction Lambda interroge un cluster et communique avec Lambda en utilisant. AWS STS

Votre mappage des sources d’événements appartient au groupe de sécurité de votre cluster Amazon MSK. Au cours de cette étape de mise en réseau, créez des points de terminaison de VPC Amazon à partir de votre VPC de cluster Amazon MSK pour connecter le mappage des sources d’événements aux services Lambda et STS. Sécurisez ces points de terminaison pour accepter le trafic provenant du groupe de sécurité de votre cluster Amazon MSK. Ajustez ensuite les groupes de sécurité du cluster Amazon MSK pour permettre au mappage des sources d’événements de communiquer avec le cluster Amazon MSK.

Vous pouvez configurer les étapes suivantes à l’aide de l’ AWS Management Console.

Pour configurer les points de terminaison de VPC Amazon d’interface afin de connecter Lambda et Amazon MSK
  1. Créez un groupe de sécurité pour les points de terminaison Amazon VPC de votre interfaceendpointSecurityGroup, qui autorise le trafic TCP entrant sur 443 depuis. clusterSecurityGroups Suivez la procédure décrite dans Créer un groupe de sécurité dans la EC2 documentation Amazon pour créer un groupe de sécurité. Suivez ensuite la procédure décrite dans Ajouter des règles à un groupe de sécurité dans la EC2 documentation Amazon pour ajouter les règles appropriées.

    Créez un groupe de sécurité avec les informations suivantes :

    Lorsque vous ajoutez vos règles de trafic entrant, créez une règle pour chaque groupe de sécurité dansclusterSecurityGroups. Pour chaque règle :

    • Dans le champ Type, sélectionnez HTTPS.

    • Pour Source, sélectionnez l'un desclusterSecurityGroups.

  2. Créez un point de terminaison connectant le service Lambda au VPC Amazon qui contient votre cluster Amazon MSK. Suivez la procédure décrite dans Create an interface endpoint.

    Créez un point de terminaison d’interface avec les informations suivantes :

    • Dans Nom du servicecom.amazonaws.regionName.lambda, sélectionnez où regionName héberge votre fonction Lambda.

    • Pour VPC, sélectionnez le VPC Amazon contenant votre cluster Amazon MSK.

    • Pour les groupes de sécuritéendpointSecurityGroup, sélectionnez ceux que vous avez créés précédemment.

    • Pour Sous-réseaux, sélectionnez les sous-réseaux qui hébergent votre cluster Amazon MSK.

    • Pour Politique, fournissez le document de politique suivant, qui sécurise le point de terminaison afin qu’il soit utilisé par le principal de service Lambda pour l’action lambda:InvokeFunction.

      { "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
    • Assurez-vous que Activer le nom DNS reste défini.

  3. Créez un point de terminaison connectant le AWS STS service au Amazon VPC contenant votre cluster Amazon MSK. Suivez la procédure décrite dans Create an interface endpoint.

    Créez un point de terminaison d’interface avec les informations suivantes :

    • Pour Nom du service, sélectionnez AWS STS.

    • Pour VPC, sélectionnez le VPC Amazon contenant votre cluster Amazon MSK.

    • Pour les groupes de sécurité, sélectionnezendpointSecurityGroup.

    • Pour Sous-réseaux, sélectionnez les sous-réseaux qui hébergent votre cluster Amazon MSK.

    • Pour Politique, fournissez le document de politique suivant, qui sécurise le point de terminaison afin qu’il soit utilisé par le principal de service Lambda pour l’action sts:AssumeRole.

      { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
    • Assurez-vous que Activer le nom DNS reste défini.

  4. Pour chaque groupe de sécurité associé à votre cluster Amazon MSK, c'est-à-dire dansclusterSecurityGroups, autorisez ce qui suit :

    • Autorisez tout le trafic TCP entrant et sortant sur le 9098 à tousclusterSecurityGroups, y compris à l'intérieur de celui-ci.

    • Autorisez tout le trafic TCP sortant sur le port 443.

    Une partie de ce trafic est autorisée par les règles des groupes de sécurité par défaut. Par conséquent, si votre cluster est attaché à un seul groupe de sécurité et que ce groupe possède des règles par défaut, il n’est pas nécessaire d’ajouter des règles. Pour ajuster les règles des groupes de sécurité, suivez les procédures décrites dans Ajouter des règles à un groupe de sécurité dans la EC2 documentation Amazon.

    Ajoutez des règles à vos groupes de sécurité avec les informations suivantes :

    • Pour chaque règle entrante ou sortante pour le port 9098, indiquez

      • Pour Type, sélectionnez Custom TCP (TCP personnalisé).

      • Pour Plage de ports, indiquez 9098.

      • Pour Source, indiquez l'un desclusterSecurityGroups.

    • Pour chaque règle entrante pour le port 443, pour Type, sélectionnez HTTPS.

Créer un rôle IAM pour que Lambda puisse lire un extrait de votre rubrique Amazon MSK

Identifiez les exigences d’authentification que Lambda doit lire dans votre rubrique Amazon MSK, puis définissez-les dans une politique. Créez un rôle qui autorise Lambda à utiliser ces autorisations. lambdaAuthRole Autorisez les actions sur votre cluster Amazon MSK à l’aide d’actions IAM kafka-cluster. Autorisez ensuite Lambda à effectuer les EC2 actions Amazon MSK et kafka Amazon nécessaires pour découvrir et se connecter à votre cluster Amazon MSK, ainsi que les actions permettant à Lambda de CloudWatch consigner ce qu'il a fait.

Pour décrire les exigences d’authentification pour que Lambda puisse lire depuis Amazon MSK
  1. Rédigez un document de politique IAM (un document JSON) qui permet à Lambda de lire un extrait de votre sujet Kafka dans votre cluster Amazon MSK en utilisant votre groupe de consommateurs Kafka. clusterAuthPolicy Lambda nécessite qu’un groupe de consommateurs Kafka soit défini lors de la lecture.

    Modifiez le modèle suivant pour l’aligner sur vos prérequis :

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/mskClusterName/cluster-uuid", "arn:aws:kafka:region:account-id:topic/mskClusterName/cluster-uuid/mskTopicName", "arn:aws:kafka:region:account-id:group/mskClusterName/cluster-uuid/mskGroupName" ] } ] }

    Pour plus d’informations, consultez Authentification basée sur les rôles IAM. Lorsque vous rédigez votre politique :

    • Pour region etaccount-id, fournissez ceux qui hébergent votre cluster Amazon MSK.

    • PourmskClusterName, indiquez le nom de votre cluster Amazon MSK.

    • Pourcluster-uuid, fournissez l'UUID dans l'ARN de votre cluster Amazon MSK.

    • PourmskTopicName, indiquez le nom de votre sujet Kafka.

    • PourmskGroupName, indiquez le nom de votre groupe de consommateurs Kafka.

  2. Identifiez Amazon MSK, Amazon EC2 et CloudWatch les autorisations requises pour que Lambda découvre et connecte votre cluster Amazon MSK, puis enregistrez ces événements.

    La politique gérée AWSLambdaMSKExecutionRole définit de manière permissive les autorisations requises. Utilisez-la dans les étapes suivantes.

    Dans un environnement de production, évaluez AWSLambdaMSKExecutionRole pour restreindre votre politique de rôle d’exécution sur la base du principe du moindre privilège, puis rédigez une politique pour votre rôle qui remplace cette politique gérée.

Pour plus d’informations sur le langage de politique IAM, consultez la documentation IAM.

Maintenant que vous avez rédigé votre document de politique, créez une politique IAM afin de pouvoir l’attacher à votre rôle. Pour ce faire, vous pouvez utiliser la console en suivant la procédure ci-dessous.

Pour créer une politique IAM à partir de votre document de politique
  1. Connectez-vous à la console IAM AWS Management Console et ouvrez-la à https://console.aws.amazon.com/iam/l'adresse.

  2. Dans le panneau de navigation de gauche, choisissez Politiques.

  3. Choisissez Create Policy (Créer une politique).

  4. Dans la section Éditeur de politiques, choisissez l’option JSON.

  5. CollerclusterAuthPolicy.

  6. Lorsque vous avez fini d’ajouter des autorisations à la politique, choisissez Suivant.

  7. Sur la page Vérifier et créer, tapez un Nom de politique et une Description (facultative) pour la politique que vous créez. Vérifiez les Autorisations définies dans cette politique pour voir les autorisations accordées par votre politique.

  8. Choisissez Create policy (Créer une politique) pour enregistrer votre nouvelle politique.

Pour plus d’informations, consultez Création de politiques IAM dans la documentation IAM.

Maintenant que vous disposez des politiques IAM appropriées, créez un rôle et attachez-les à celui-ci. Pour ce faire, vous pouvez utiliser la console en suivant la procédure ci-dessous.

Pour créer un rôle d’exécution dans la console IAM
  1. Ouvrez la page Roles (Rôles) dans la console IAM.

  2. Sélectionnez Créer un rôle.

  3. Sous Trusted entity type (Type d’entité approuvée), choisissez service AWS .

  4. Sous Cas d’utilisation, choisissez Lambda.

  5. Choisissez Suivant.

  6. Sélectionnez les stratégies suivantes :

    • clusterAuthPolicy

    • AWSLambdaMSKExecutionRole

  7. Choisissez Suivant.

  8. Dans Nom du rôle, entrez lambdaAuthRole puis choisissez Créer un rôle.

Pour de plus amples informations, veuillez consulter Définition des autorisations de fonction Lambda avec un rôle d’exécution.

Créer une fonction Lambda pour lire à partir de votre rubrique Amazon MSK

Créez une fonction Lambda configurée pour utiliser votre rôle IAM. Vous pouvez enregistrer votre fonction Lambda à l’aide de la console.

Pour créer une fonction Lambda à l’aide de votre configuration d’authentification
  1. Ouvrez la console Lambda et choisissez Créer une fonction dans l’en-tête.

  2. Sélectionnez Créer à partir de zéro.

  3. Pour Nom de la fonction, saisissez un nom approprié de votre choix.

  4. Pour Environnement d’exécution, choisissez la dernière version prise en charge (Dernier pris en charge) de Node.js pour utiliser le code fourni dans ce tutoriel.

  5. Choisissez Modifier le rôle d’exécution par défaut.

  6. Sélectionnez Utiliser un rôle existant.

  7. Pour Rôle existant, sélectionnezlambdaAuthRole.

Dans un environnement de production, vous devez généralement ajouter des politiques supplémentaires au rôle d’exécution de votre fonction Lambda afin de traiter intelligemment vos événements Amazon MSK. Pour plus d’informations sur l’ajout de politiques à votre rôle, consultez la section Ajouter ou supprimer des autorisations d’identité dans la documentation IAM.

Création d’un mappage des sources d’événements pour votre fonction Lambda

Votre mappage des sources d’événements Amazon MSK fournit au service Lambda les informations nécessaires pour invoquer votre fonction Lambda lorsque des événements Amazon MSK appropriés se produisent. Vous pouvez créer un mappage Amazon MSK à l’aide de la console. Créez un déclencheur Lambda, puis le mappage des sources d’événements est automatiquement configuré.

Pour créer un déclenceur Lambda (et un mappage des sources d’événements)
  1. Accédez à la page de présentation de votre fonction Lambda.

  2. Dans la section de présentation de la fonction, choisissez Ajouter un déclencheur en bas à gauche.

  3. Dans le menu déroulant Sélectionner une source, sélectionnez Amazon MSK.

  4. Ne configurez pas l’authentification.

  5. Pour Cluster MSK, sélectionnez le nom de votre cluster.

  6. Pour Taille de lot, saisissez 1. Cette étape facilite le test de cette fonctionnalité. Elle ne constitue pas une valeur idéale en production.

  7. Pour Nom de la rubrique, indiquez le nom de votre rubrique Kafka.

  8. Pour ID du groupe de consommateurs, indiquez l’ID de votre groupe de consommateurs Kafka.

Mise à jour de votre fonction Lambda pour lire vos données de streaming

Lambda fournit des informations sur les événements Kafka via le paramètre de méthode d’événement. Pour obtenir un exemple de structure d’un événement Amazon MSK, consultez Exemple d’évènement. Après avoir compris comment interpréter les événements Amazon MSK transférés par Lambda, vous pouvez modifier le code de votre fonction Lambda pour utiliser les informations qu’ils fournissent.

Fournissez le code suivant à votre fonction Lambda pour journaliser le contenu d’un événement Lambda Amazon MSK à des fins de test :

.NET
AWS SDK for .NET
Note

Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Consommation d’un événement Amazon MSK avec Lambda en utilisant .NET.

using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.KafkaEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace MSKLambda; public class Function { /// <param name="input">The event for the Lambda function handler to process.</param> /// <param name="context">The ILambdaContext that provides methods for logging and describing the Lambda environment.</param> /// <returns></returns> public void FunctionHandler(KafkaEvent evnt, ILambdaContext context) { foreach (var record in evnt.Records) { Console.WriteLine("Key:" + record.Key); foreach (var eventRecord in record.Value) { var valueBytes = eventRecord.Value.ToArray(); var valueText = Encoding.UTF8.GetString(valueBytes); Console.WriteLine("Message:" + valueText); } } } }
Go
Kit SDK for Go V2
Note

Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Consommation d’un événement Amazon MSK avec Lambda en utilisant Go.

package main import ( "encoding/base64" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(event events.KafkaEvent) { for key, records := range event.Records { fmt.Println("Key:", key) for _, record := range records { fmt.Println("Record:", record) decodedValue, _ := base64.StdEncoding.DecodeString(record.Value) message := string(decodedValue) fmt.Println("Message:", message) } } } func main() { lambda.Start(handler) }
Java
SDK pour Java 2.x
Note

Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Consommation d’un événement Amazon MSK avec Lambda en utilisant Java.

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KafkaEvent; import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord; import java.util.Base64; import java.util.Map; public class Example implements RequestHandler<KafkaEvent, Void> { @Override public Void handleRequest(KafkaEvent event, Context context) { for (Map.Entry<String, java.util.List<KafkaEventRecord>> entry : event.getRecords().entrySet()) { String key = entry.getKey(); System.out.println("Key: " + key); for (KafkaEventRecord record : entry.getValue()) { System.out.println("Record: " + record); byte[] value = Base64.getDecoder().decode(record.getValue()); String message = new String(value); System.out.println("Message: " + message); } } return null; } }
JavaScript
SDK pour JavaScript (v3)
Note

Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Utilisation d'un événement Amazon MSK avec JavaScript Lambda à l'aide de.

exports.handler = async (event) => { // Iterate through keys for (let key in event.records) { console.log('Key: ', key) // Iterate through records event.records[key].map((record) => { console.log('Record: ', record) // Decode base64 const msg = Buffer.from(record.value, 'base64').toString() console.log('Message:', msg) }) } }
PHP
Kit SDK pour PHP
Note

Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Consommation d’un événement Amazon MSK avec Lambda en utilisant PHP.

<?php // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 // using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\Kafka\KafkaEvent; use Bref\Event\Handler as StdHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler implements StdHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handle(mixed $event, Context $context): void { $kafkaEvent = new KafkaEvent($event); $this->logger->info("Processing records"); $records = $kafkaEvent->getRecords(); foreach ($records as $record) { try { $key = $record->getKey(); $this->logger->info("Key: $key"); $values = $record->getValue(); $this->logger->info(json_encode($values)); foreach ($values as $value) { $this->logger->info("Value: $value"); } } catch (Exception $e) { $this->logger->error($e->getMessage()); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK pour Python (Boto3)
Note

Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Consommation d’un événement Amazon MSK avec Lambda en utilisant Python.

import base64 def lambda_handler(event, context): # Iterate through keys for key in event['records']: print('Key:', key) # Iterate through records for record in event['records'][key]: print('Record:', record) # Decode base64 msg = base64.b64decode(record['value']).decode('utf-8') print('Message:', msg)
Ruby
Kit SDK pour Ruby
Note

Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Consommation d’un événement Amazon MSK avec Lambda en utilisant Ruby.

require 'base64' def lambda_handler(event:, context:) # Iterate through keys event['records'].each do |key, records| puts "Key: #{key}" # Iterate through records records.each do |record| puts "Record: #{record}" # Decode base64 msg = Base64.decode64(record['value']) puts "Message: #{msg}" end end end
Rust
SDK pour Rust
Note

Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Utilisation d'un événement Amazon MSK avec Lambda à l'aide de Rust.

use aws_lambda_events::event::kafka::KafkaEvent; use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent}; use base64::prelude::*; use serde_json::{Value}; use tracing::{info}; /// Pre-Requisites: /// 1. Install Cargo Lambda - see https://www.cargo-lambda.info/guide/getting-started.html /// 2. Add packages tracing, tracing-subscriber, serde_json, base64 /// /// This is the main body for the function. /// Write your code inside it. /// There are some code example in the following URLs: /// - https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/examples /// - https://github.com/aws-samples/serverless-rust-demo/ async fn function_handler(event: LambdaEvent<KafkaEvent>) -> Result<Value, Error> { let payload = event.payload.records; for (_name, records) in payload.iter() { for record in records { let record_text = record.value.as_ref().ok_or("Value is None")?; info!("Record: {}", &record_text); // perform Base64 decoding let record_bytes = BASE64_STANDARD.decode(record_text)?; let message = std::str::from_utf8(&record_bytes)?; info!("Message: {}", message); } } Ok(().into()) } #[tokio::main] async fn main() -> Result<(), Error> { // required to enable CloudWatch error logging by the runtime tracing::init_default_subscriber(); info!("Setup CW subscriber!"); run(service_fn(function_handler)).await }

Vous pouvez fournir le code de fonction à votre fonction Lambda à l’aide de la console.

Pour mettre à jour le code de fonction à l’aide de l’éditeur de code de la console
  1. Ouvrez la page Fonctions de la console Lambda et choisissez votre fonction.

  2. Sélectionnez l’onglet Code.

  3. Dans le volet Source du code, sélectionnez votre fichier de code source et modifiez-le dans l’éditeur de code intégré.

  4. Dans la section DÉPLOYER, choisissez Déployer pour mettre à jour le code de votre fonction :

    Bouton de déploiement dans l’éditeur de code de la console Lambda

Test de votre fonction Lambda pour vérifier qu’elle est connectée à votre rubrique Amazon MSK

Vous pouvez désormais vérifier si votre Lambda est invoqué par la source d'événements en consultant les journaux d'événements. CloudWatch

Pour vérifier si votre fonction Lambda est invoquée
  1. Utilisez votre hôte administrateur Kafka pour générer des événements Kafka à l’aide de la CLI kafka-console-producer. Pour plus d’informations, consultez Write some events into the topic dans la documentation Kafka. Envoyez suffisamment d’événements pour remplir le lot défini en fonction de la taille du lot pour votre mappage des sources d’événements défini à l’étape précédente, sinon Lambda attendra d’autres informations pour procéder à l’invocation.

  2. Si votre fonction s'exécute, Lambda écrit ce qui s'est passé à. CloudWatch Dans la console, accédez à la page des détails de votre fonction Lambda.

  3. Sélectionnez l’onglet Configuration.

  4. Dans la barre latérale, sélectionnez Outils de surveillance et d’exploitation.

  5. Identifiez le groupe de CloudWatch journaux sous Configuration de la journalisation. Le groupe de journaux doit commencer par /aws/lambda. Choisissez le lien vers le groupe de journaux.

  6. Dans la CloudWatch console, examinez les événements du journal pour les événements du journal que Lambda a envoyés au flux de journal. Identifiez s’il existe des événements de journaux contenant le message de votre événement Kafka, comme dans l’image suivante. Si tel est le cas, vous avez connecté avec succès une fonction Lambda à Amazon MSK à l’aide d’un mappage des sources d’événements Lambda.

    Un événement de journal CloudWatch affichant les informations sur les événements extraites par le code fourni.