

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Tutoriel : Utilisation d’un mappage des sources d’événements Amazon MSK pour invoquer une fonction Lambda
<a name="services-msk-tutorial"></a>

Dans ce tutoriel, vous exécuterez les étapes suivantes :
+ Créez une fonction Lambda dans le même AWS compte qu'un cluster Amazon MSK existant.
+ Configurer le réseau et l’authentification pour que Lambda communique avec Amazon MSK.
+ Configurer un mappage des sources d’événements Amazon MSK Lambda, qui exécute votre fonction Lambda lorsque des événements apparaissent dans la rubrique.

Une fois ces étapes terminées, vous pouvez configurer une fonction Lambda pour traiter automatiquement les événements envoyés à Amazon MSK avec votre code Lambda personnalisé.

 **Que pouvez-vous faire avec cette fonctionnalité ?** 

**Exemple de solution : Utiliser un mappage des sources d’événements MSK pour fournir des résultats en direct à vos clients.**

Imaginons le scénario suivant : votre entreprise héberge une application Web dans laquelle vos clients peuvent consulter des informations sur des événements en direct, tels que des matchs de sport. Les informations actualisées du jeu sont fournies à votre équipe via une rubrique Kafka sur Amazon MSK. Vous souhaitez concevoir une solution qui utilise les mises à jour issues de la rubrique MSK afin de fournir une vue actualisée de l’événement en direct aux clients au sein d’une application que vous développez. Vous avez opté pour l’approche de conception suivante : vos applications clientes communiqueront avec un dorsal sans serveur hébergé dans AWS. Les clients se connecteront via des sessions websocket à l'aide de l'API Amazon WebSocket API Gateway.

Dans cette solution, vous avez besoin d’un composant qui lit les événements MSK, exécute une logique personnalisée pour préparer ces événements pour la couche application, puis transmet ces informations à l’API API Gateway. Vous pouvez implémenter ce composant en fournissant votre logique personnalisée dans une fonction Lambda, puis en l'appelant à l'aide d'un mappage de source d'événements AWS Lambda Amazon MSK. AWS Lambda

Pour plus d'informations sur la mise en œuvre de solutions à l'aide de l'API Amazon WebSocket API Gateway, consultez les [WebSocket didacticiels](https://docs.aws.amazon.com/apigateway/latest/developerguide/websocket-api-chat-app.html) sur les API dans la documentation d'API Gateway.

## Conditions préalables
<a name="w2aad101c23c15c35c19"></a>

Un AWS compte avec les ressources préconfigurées suivantes :

**Pour remplir ces prérequis, nous vous recommandons de suivre [Get started using Amazon MSK](https://docs.aws.amazon.com//msk/latest/developerguide/getting-started.html) dans la documentation Amazon MSK.**
+ Un cluster Amazon MSK. Consultez [Create an Amazon MSK cluster](https://docs.aws.amazon.com//msk/latest/developerguide/create-cluster.html) dans *Getting started using Amazon MSK*.
+ La configuration suivante :
  + Assurez-vous que l’**authentification basée sur les rôles IAM** est **Activée** dans les paramètres de sécurité de votre cluster. Cela améliore votre sécurité en limitant votre fonction Lambda à l’accès aux ressources Amazon MSK nécessaires uniquement. L’authentification basée sur les rôles IAM est activée par défaut sur les nouveaux clusters Amazon MSK.
  + Assurez-vous que l’**Accès public** est désactivé dans les paramètres réseau de votre cluster. Restreindre l’accès à Internet de votre cluster Amazon MSK améliore votre sécurité en limitant le nombre d’intermédiaires qui traitent vos données. L’accès public est activé par défaut sur les nouveaux clusters Amazon MSK.
+ Une rubrique Kafka dans votre cluster Amazon MSK à utiliser pour cette solution. Consultez [Create a topic](https://docs.aws.amazon.com//msk/latest/developerguide/create-topic.html) dans *Getting started using Amazon MSK*.
+ Un hôte administrateur Kafka configuré pour extraire les informations de votre cluster Kafka et envoyer des événements Kafka à votre rubrique à des fins de test, par exemple une instance Amazon EC2 sur laquelle la CLI d’administration Kafka et la bibliothèque IAM Amazon MSK sont installées. Consultez [Create a client machine](https://docs.aws.amazon.com//msk/latest/developerguide/create-client-machine.html) dans *Getting started using Amazon MSK*.

Une fois que vous avez configuré ces ressources, collectez les informations suivantes à partir de votre AWS compte pour confirmer que vous êtes prêt à continuer.
+ Le nom de votre cluster Amazon MSK. Vous pouvez trouver cette information dans la console Amazon MSK.
+ L’UUID du cluster, qui fait partie de l’ARN de votre cluster Amazon MSK, que vous pouvez trouver dans la console Amazon MSK. Suivez les procédures décrites dans la rubrique [Listing clusters](https://docs.aws.amazon.com/msk/latest/developerguide/msk-list-clusters.html) de la documentation Amazon MSK pour trouver cette information.
+ Les groupes de sécurité associés à votre cluster Amazon MSK. Vous pouvez trouver cette information dans la console Amazon MSK. Dans les étapes suivantes, appelez-les vos*clusterSecurityGroups*.
+ L’ID du VPC Amazon contenant votre cluster Amazon MSK. Vous pouvez trouver cette information en identifiant les sous-réseaux associés à votre cluster Amazon MSK dans la console Amazon MSK, puis en identifiant le VPC Amazon associé au sous-réseau dans la console Amazon VPC.
+ Le nom de la rubrique Kafka utilisée dans votre solution. Vous pouvez trouver cette information en appelant votre cluster Amazon MSK à l’aide de la CLI `topics` Kafka depuis votre hôte administrateur Kafka. Pour plus d’informations sur la CLI de rubriques, consultez la section [Adding and removing topics](https://kafka.apache.org/documentation/#basic_ops_add_topic) dans la documentation Kafka.
+ Le nom d’un groupe de consommateurs pour votre rubrique Kafka, adapté à une utilisation par votre fonction Lambda. Ce groupe peut être créé automatiquement par Lambda. Vous n’avez donc pas besoin de le créer avec la CLI Kafka. Si vous devez gérer vos groupes de consommateurs, pour en savoir plus sur la CLI de groupes de consommateurs, consultez la rubrique [Managing Consumer Groups](https://kafka.apache.org/documentation/#basic_ops_consumer_group) dans la documentation Kafka.

Les autorisations suivantes dans votre AWS compte :
+ L’autorisation de créer et de gérer une fonction Lambda.
+ L’autorisation de créer des politiques IAM et de les associer à votre fonction Lambda.
+ L’autorisation de créer des points de terminaison de VPC Amazon et de modifier la configuration réseau dans le VPC Amazon hébergeant votre cluster Amazon MSK.

### Installez le AWS Command Line Interface
<a name="install_aws_cli"></a>

Si vous ne l'avez pas encore installé AWS Command Line Interface, suivez les étapes décrites dans la [section Installation ou mise à jour de la dernière version du AWS CLI pour l'](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)installer.

Ce tutoriel nécessite un terminal de ligne de commande ou un shell pour exécuter les commandes. Sous Linux et macOS, utilisez votre gestionnaire de shell et de package préféré.

**Note**  
Sous Windows, certaines commandes CLI Bash que vous utilisez couramment avec Lambda (par exemple `zip`) ne sont pas prises en charge par les terminaux intégrés du système d’exploitation. [Installez le sous-système Windows pour Linux](https://docs.microsoft.com/en-us/windows/wsl/install-win10) afin d’obtenir une version intégrée à Windows d’Ubuntu et Bash. 

## Configurer la connectivité réseau pour que Lambda communique avec Amazon MSK
<a name="w2aad101c23c15c35c21"></a>

 AWS PrivateLink À utiliser pour connecter Lambda et Amazon MSK. Pour ce faire, vous créez des points de terminaison de VPC Amazon d’interface dans la console Amazon VPC. Pour plus d’informations sur la configuration réseau, consultez [Configuration de votre cluster Amazon MSK et de votre réseau Amazon VPC pour Lambda](with-msk-cluster-network.md). 

Lorsqu’un mappage des sources d’événements Amazon MSK s’exécute pour le compte d’une fonction Lambda, il endosse le rôle d’exécution de la fonction Lambda. Ce rôle IAM autorise le mappage pour accéder aux ressources sécurisées par IAM, telles que votre cluster Amazon MSK. Bien que les composants partagent un rôle d’exécution, le mappage Amazon MSK et votre fonction Lambda ont des exigences de connectivité distinctes pour leurs tâches respectives, comme le montre le schéma suivant.

![\[\]](http://docs.aws.amazon.com/fr_fr/lambda/latest/dg/images/msk_tut_network.png)


Votre mappage des sources d’événements appartient au groupe de sécurité de votre cluster Amazon MSK. Au cours de cette étape de mise en réseau, créez des points de terminaison de VPC Amazon à partir de votre VPC de cluster Amazon MSK pour connecter le mappage des sources d’événements aux services Lambda et STS. Sécurisez ces points de terminaison pour accepter le trafic provenant du groupe de sécurité de votre cluster Amazon MSK. Ajustez ensuite les groupes de sécurité du cluster Amazon MSK pour permettre au mappage des sources d’événements de communiquer avec le cluster Amazon MSK.

 Vous pouvez configurer les étapes suivantes à l’aide de l’ AWS Management Console.

**Pour configurer les points de terminaison de VPC Amazon d’interface afin de connecter Lambda et Amazon MSK**

1. Créez un groupe de sécurité pour les points de terminaison Amazon VPC de votre interface*endpointSecurityGroup*, qui autorise le trafic TCP entrant sur 443 depuis. *clusterSecurityGroups* Suivez la procédure décrite dans la rubrique [Create a security group](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#creating-security-group) de la documentation Amazon EC2 pour créer un groupe de sécurité. Suivez ensuite la procédure décrite dans la rubrique [Add rules to a security group](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#adding-security-group-rule) de la documentation Amazon EC2 pour ajouter les règles appropriées. 

   **Créez un groupe de sécurité avec les informations suivantes :**

   Lorsque vous ajoutez vos règles de trafic entrant, créez une règle pour chaque groupe de sécurité dans*clusterSecurityGroups*. Pour chaque règle :
   + Dans le champ **Type**, sélectionnez **HTTPS**.
   + Pour **Source**, sélectionnez l'un des*clusterSecurityGroups*.

1.  Créez un point de terminaison connectant le service Lambda au VPC Amazon qui contient votre cluster Amazon MSK. Suivez la procédure décrite dans [Create an interface endpoint](https://docs.aws.amazon.com//vpc/latest/privatelink/create-interface-endpoint.html).

   **Créez un point de terminaison d’interface avec les informations suivantes :**
   + Dans **Nom du service**`com.amazonaws.regionName.lambda`, sélectionnez où *regionName* héberge votre fonction Lambda.
   + Pour **VPC**, sélectionnez le VPC Amazon contenant votre cluster Amazon MSK.
   + Pour les **groupes de sécurité***endpointSecurityGroup*, sélectionnez ceux que vous avez créés précédemment.
   + Pour **Sous-réseaux**, sélectionnez les sous-réseaux qui hébergent votre cluster Amazon MSK.
   + Pour **Politique**, fournissez le document de politique suivant, qui sécurise le point de terminaison afin qu’il soit utilisé par le principal de service Lambda pour l’action `lambda:InvokeFunction`.

     ```
     {
         "Statement": [
             {
                 "Action": "lambda:InvokeFunction",
                 "Effect": "Allow",
                 "Principal": {
                     "Service": [
                         "lambda.amazonaws.com"
                     ]
                 },
                 "Resource": "*"
             }
         ]
     }
     ```
   + Assurez-vous que **Activer le nom DNS** reste défini.

1.  Créez un point de terminaison connectant le AWS STS service au Amazon VPC contenant votre cluster Amazon MSK. Suivez la procédure décrite dans [Create an interface endpoint](https://docs.aws.amazon.com//vpc/latest/privatelink/create-interface-endpoint.html).

   **Créez un point de terminaison d’interface avec les informations suivantes :**
   + Pour **Nom du service**, sélectionnez AWS STS.
   + Pour **VPC**, sélectionnez le VPC Amazon contenant votre cluster Amazon MSK.
   + Pour les **groupes de sécurité**, sélectionnez*endpointSecurityGroup*.
   + Pour **Sous-réseaux**, sélectionnez les sous-réseaux qui hébergent votre cluster Amazon MSK.
   + Pour **Politique**, fournissez le document de politique suivant, qui sécurise le point de terminaison afin qu’il soit utilisé par le principal de service Lambda pour l’action `sts:AssumeRole`.

     ```
     {
         "Statement": [
             {
                 "Action": "sts:AssumeRole",
                 "Effect": "Allow",
                 "Principal": {
                     "Service": [
                         "lambda.amazonaws.com"
                     ]
                 },
                 "Resource": "*"
             }
         ]
     }
     ```
   + Assurez-vous que **Activer le nom DNS** reste défini.

1. Pour chaque groupe de sécurité associé à votre cluster Amazon MSK, c'est-à-dire dans*clusterSecurityGroups*, autorisez ce qui suit :
   + Autorisez tout le trafic TCP entrant et sortant sur le 9098 à tous*clusterSecurityGroups*, y compris à l'intérieur de celui-ci.
   + Autorisez tout le trafic TCP sortant sur le port 443.

   Une partie de ce trafic est autorisée par les règles des groupes de sécurité par défaut. Par conséquent, si votre cluster est attaché à un seul groupe de sécurité et que ce groupe possède des règles par défaut, il n’est pas nécessaire d’ajouter des règles. Pour ajuster les règles de groupe de sécurité, suivez les procédures décrites dans la rubrique [Add rules to a security group](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#adding-security-group-rule) de la documentation Amazon EC2.

   **Ajoutez des règles à vos groupes de sécurité avec les informations suivantes :**
   + Pour chaque règle entrante ou sortante pour le port 9098, indiquez
     + Pour **Type**, sélectionnez **Custom TCP (TCP personnalisé)**.
     + Pour **Plage de ports**, indiquez 9098.
     + Pour **Source**, indiquez l'un des*clusterSecurityGroups*.
   + Pour chaque règle entrante pour le port 443, pour **Type**, sélectionnez **HTTPS**.

## Créer un rôle IAM pour que Lambda puisse lire un extrait de votre rubrique Amazon MSK
<a name="w2aad101c23c15c35c23"></a>

Identifiez les exigences d’authentification que Lambda doit lire dans votre rubrique Amazon MSK, puis définissez-les dans une politique. Créez un rôle qui autorise Lambda à utiliser ces autorisations. *lambdaAuthRole* Autorisez les actions sur votre cluster Amazon MSK à l’aide d’actions IAM `kafka-cluster`. Autorisez ensuite Lambda à effectuer les actions Amazon MSK et Amazon `kafka` EC2 nécessaires pour découvrir et vous connecter à votre cluster Amazon MSK, ainsi que les actions permettant à Lambda de consigner CloudWatch ce qu'il a fait.

**Pour décrire les exigences d’authentification pour que Lambda puisse lire depuis Amazon MSK**

1. Rédigez un document de politique IAM (un document JSON) qui permet à Lambda de lire un extrait de votre sujet Kafka dans votre cluster Amazon MSK en utilisant votre groupe de consommateurs Kafka. *clusterAuthPolicy* Lambda nécessite qu’un groupe de consommateurs Kafka soit défini lors de la lecture.

   Modifiez le modèle suivant pour l’aligner sur vos prérequis :

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Action": [
                   "kafka-cluster:Connect",
                   "kafka-cluster:DescribeGroup",
                   "kafka-cluster:AlterGroup",
                   "kafka-cluster:DescribeTopic",
                   "kafka-cluster:ReadData",
                   "kafka-cluster:DescribeClusterDynamicConfiguration"
               ],
               "Resource": [
                   "arn:aws:kafka:us-east-1:111122223333:cluster/mskClusterName/cluster-uuid",
                   "arn:aws:kafka:us-east-1:111122223333:topic/mskClusterName/cluster-uuid/mskTopicName",
                   "arn:aws:kafka:us-east-1:111122223333:group/mskClusterName/cluster-uuid/mskGroupName"
               ]
           }
       ]
   }
   ```

------

   Pour plus d’informations, consultez [Configuration des autorisations Lambda pour les mappages de sources d'événements Amazon MSK](with-msk-permissions.md). Lorsque vous rédigez votre politique :
   + Remplacez *us-east-1* et *111122223333* par le Région AWS et Compte AWS de votre cluster Amazon MSK.
   + Pour*mskClusterName*, indiquez le nom de votre cluster Amazon MSK.
   + Pour*cluster-uuid*, fournissez l'UUID dans l'ARN de votre cluster Amazon MSK.
   + Pour*mskTopicName*, indiquez le nom de votre sujet Kafka.
   + Pour*mskGroupName*, indiquez le nom de votre groupe de consommateurs Kafka.

1. Identifiez Amazon MSK, Amazon EC2 CloudWatch et les autorisations requises pour que Lambda découvre et connecte votre cluster Amazon MSK, puis enregistrez ces événements.

   La politique gérée `AWSLambdaMSKExecutionRole` définit de manière permissive les autorisations requises. Utilisez-la dans les étapes suivantes.

   Dans un environnement de production, évaluez `AWSLambdaMSKExecutionRole` pour restreindre votre politique de rôle d’exécution sur la base du principe du moindre privilège, puis rédigez une politique pour votre rôle qui remplace cette politique gérée.

Pour plus d’informations sur le langage de politique IAM, consultez la [documentation IAM](https://docs.aws.amazon.com//iam/).

Maintenant que vous avez rédigé votre document de politique, créez une politique IAM afin de pouvoir l’attacher à votre rôle. Pour ce faire, vous pouvez utiliser la console en suivant la procédure ci-dessous.

**Pour créer une politique IAM à partir de votre document de politique**

1. Connectez-vous à la console IAM AWS Management Console et ouvrez-la à [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)l'adresse.

1. Dans le panneau de navigation de gauche, choisissez **Politiques**. 

1. Choisissez **Create Policy** (Créer une politique).

1. Dans la section **Éditeur de politiques**, choisissez l’option **JSON**.

1. Coller*clusterAuthPolicy*.

1. Lorsque vous avez fini d’ajouter des autorisations à la politique, choisissez **Suivant**.

1. Sur la page **Vérifier et créer**, tapez un **Nom de politique** et une **Description** (facultative) pour la politique que vous créez. Vérifiez les **Autorisations définies dans cette politique** pour voir les autorisations accordées par votre politique.

1. Choisissez **Create policy** (Créer une politique) pour enregistrer votre nouvelle politique.

Pour plus d’informations, consultez [Création de politiques IAM](https://docs.aws.amazon.com//IAM/latest/UserGuide/access_policies_create.html) dans la documentation IAM.

Maintenant que vous disposez des politiques IAM appropriées, créez un rôle et attachez-les à celui-ci. Pour ce faire, vous pouvez utiliser la console en suivant la procédure ci-dessous.

**Pour créer un rôle d’exécution dans la console IAM**

1. Ouvrez la page [Roles (Rôles)](https://console.aws.amazon.com/iam/home#/roles) dans la console IAM.

1. Choisissez **Créer un rôle**.

1. Sous **Trusted entity type** (Type d’entité approuvée), choisissez **service AWS **.

1. Sous **Cas d’utilisation**, choisissez **Lambda**.

1. Choisissez **Suivant**.

1. Sélectionnez les stratégies suivantes :
   + *clusterAuthPolicy*
   + `AWSLambdaMSKExecutionRole`

1. Choisissez **Suivant**.

1. Dans **Nom du rôle**, entrez *lambdaAuthRole* puis choisissez **Créer un rôle**.

Pour de plus amples informations, veuillez consulter [Définition des autorisations de fonction Lambda avec un rôle d’exécution](lambda-intro-execution-role.md).

## Créer une fonction Lambda pour lire à partir de votre rubrique Amazon MSK
<a name="w2aad101c23c15c35c25"></a>

Créez une fonction Lambda configurée pour utiliser votre rôle IAM. Vous pouvez enregistrer votre fonction Lambda à l’aide de la console.

**Pour créer une fonction Lambda à l’aide de votre configuration d’authentification**

1.  Ouvrez la console Lambda et choisissez **Créer une fonction** dans l’en-tête. 

1. Sélectionnez **Créer à partir de zéro**.

1. Pour **Nom de la fonction**, saisissez un nom approprié de votre choix.

1. Pour **Environnement d’exécution**, choisissez la dernière version prise en charge (**Dernier pris en charge**) de `Node.js` pour utiliser le code fourni dans ce tutoriel.

1. Choisissez **Modifier le rôle d’exécution par défaut**.

1. Sélectionnez **Utiliser un rôle existant**.

1. Pour **Rôle existant**, sélectionnez*lambdaAuthRole*.

Dans un environnement de production, vous devez généralement ajouter des politiques supplémentaires au rôle d’exécution de votre fonction Lambda afin de traiter intelligemment vos événements Amazon MSK. Pour plus d’informations sur l’ajout de politiques à votre rôle, consultez la section [Ajouter ou supprimer des autorisations d’identité](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html#add-policies-console) dans la documentation IAM.

## Création d’un mappage des sources d’événements pour votre fonction Lambda
<a name="w2aad101c23c15c35c27"></a>

Votre mappage des sources d’événements Amazon MSK fournit au service Lambda les informations nécessaires pour invoquer votre fonction Lambda lorsque des événements Amazon MSK appropriés se produisent. Vous pouvez créer un mappage Amazon MSK à l’aide de la console. Créez un déclencheur Lambda, puis le mappage des sources d’événements est automatiquement configuré.

**Pour créer un déclenceur Lambda (et un mappage des sources d’événements)**

1. Accédez à la page de présentation de votre fonction Lambda.

1. Dans la section de présentation de la fonction, choisissez **Ajouter un déclencheur** en bas à gauche.

1. Dans le menu déroulant **Sélectionner une source**, sélectionnez **Amazon MSK**.

1. Ne configurez pas l’**authentification**.

1. Pour **Cluster MSK**, sélectionnez le nom de votre cluster.

1. Pour **Taille de lot**, saisissez 1. Cette étape facilite le test de cette fonctionnalité. Elle ne constitue pas une valeur idéale en production.

1. Pour **Nom de la rubrique**, indiquez le nom de votre rubrique Kafka.

1. Pour **ID du groupe de consommateurs**, indiquez l’ID de votre groupe de consommateurs Kafka.

## Mise à jour de votre fonction Lambda pour lire vos données de streaming
<a name="w2aad101c23c15c35c29"></a>

 Lambda fournit des informations sur les événements Kafka via le paramètre de méthode d’événement. Pour obtenir un exemple de structure d’un événement Amazon MSK, consultez [Exemple d’évènement](with-msk.md#msk-sample-event). Après avoir compris comment interpréter les événements Amazon MSK transférés par Lambda, vous pouvez modifier le code de votre fonction Lambda pour utiliser les informations qu’ils fournissent. 

 Fournissez le code suivant à votre fonction Lambda pour journaliser le contenu d’un événement Lambda Amazon MSK à des fins de test : 

------
#### [ .NET ]

**SDK pour .NET**  
 Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’[exemples sans serveur](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda). 
Consommation d’un événement Amazon MSK avec Lambda en utilisant .NET.  

```
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KafkaEvents;


// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace MSKLambda;

public class Function
{
    
    
    /// <param name="input">The event for the Lambda function handler to process.</param>
    /// <param name="context">The ILambdaContext that provides methods for logging and describing the Lambda environment.</param>
    /// <returns></returns>
    public void FunctionHandler(KafkaEvent evnt, ILambdaContext context)
    {

        foreach (var record in evnt.Records)
        {
            Console.WriteLine("Key:" + record.Key); 
            foreach (var eventRecord in record.Value)
            {
                var valueBytes = eventRecord.Value.ToArray();    
                var valueText = Encoding.UTF8.GetString(valueBytes);
                
                Console.WriteLine("Message:" + valueText);
            }
        }
    }
    

}
```

------
#### [ Go ]

**Kit SDK pour Go V2**  
 Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’[exemples sans serveur](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda). 
Consommation d’un événement Amazon MSK avec Lambda en utilisant Go.  

```
package main

import (
	"encoding/base64"
	"fmt"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(event events.KafkaEvent) {
	for key, records := range event.Records {
		fmt.Println("Key:", key)

		for _, record := range records {
			fmt.Println("Record:", record)

			decodedValue, _ := base64.StdEncoding.DecodeString(record.Value)
			message := string(decodedValue)
			fmt.Println("Message:", message)
		}
	}
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK pour Java 2.x**  
 Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’[exemples sans serveur](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda). 
Consommation d’un événement Amazon MSK avec Lambda en utilisant Java.  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KafkaEvent;
import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord;

import java.util.Base64;
import java.util.Map;

public class Example implements RequestHandler<KafkaEvent, Void> {

    @Override
    public Void handleRequest(KafkaEvent event, Context context) {
        for (Map.Entry<String, java.util.List<KafkaEventRecord>> entry : event.getRecords().entrySet()) {
            String key = entry.getKey();
            System.out.println("Key: " + key);

            for (KafkaEventRecord record : entry.getValue()) {
                System.out.println("Record: " + record);

                byte[] value = Base64.getDecoder().decode(record.getValue());
                String message = new String(value);
                System.out.println("Message: " + message);
            }
        }

        return null;
    }
}
```

------
#### [ JavaScript ]

**SDK pour JavaScript (v3)**  
 Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’[exemples sans serveur](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda). 
Utilisation d'un événement Amazon MSK avec JavaScript Lambda à l'aide de.  

```
exports.handler = async (event) => {
    // Iterate through keys
    for (let key in event.records) {
      console.log('Key: ', key)
      // Iterate through records
      event.records[key].map((record) => {
        console.log('Record: ', record)
        // Decode base64
        const msg = Buffer.from(record.value, 'base64').toString()
        console.log('Message:', msg)
      }) 
    }
}
```
Utilisation d'un événement Amazon MSK avec TypeScript Lambda à l'aide de.  

```
import { MSKEvent, Context } from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "msk-handler-sample",
});

export const handler = async (
  event: MSKEvent,
  context: Context
): Promise<void> => {
  for (const [topic, topicRecords] of Object.entries(event.records)) {
    logger.info(`Processing key: ${topic}`);

    // Process each record in the partition
    for (const record of topicRecords) {
      try {
        // Decode the message value from base64
        const decodedMessage = Buffer.from(record.value, 'base64').toString();

        logger.info({
          message: decodedMessage
        });
      }
      catch (error) {
        logger.error('Error processing event', { error });
        throw error;
      }
    };
  }
}
```

------
#### [ PHP ]

**Kit SDK pour PHP**  
 Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’[exemples sans serveur](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda). 
Consommation d’un événement Amazon MSK avec Lambda en utilisant PHP.  

```
<?php
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

// using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kafka\KafkaEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler implements StdHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handle(mixed $event, Context $context): void
    {
        $kafkaEvent = new KafkaEvent($event);
        $this->logger->info("Processing records");
        $records = $kafkaEvent->getRecords();

        foreach ($records as $record) {
            try {
                $key = $record->getKey();
                $this->logger->info("Key: $key");

                $values = $record->getValue();
                $this->logger->info(json_encode($values));

                foreach ($values as $value) {
                    $this->logger->info("Value: $value");
                }
                
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**Kit SDK for Python (Boto3)**  
 Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’[exemples sans serveur](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda). 
Consommation d’un événement Amazon MSK avec Lambda en utilisant Python.  

```
import base64

def lambda_handler(event, context):
    # Iterate through keys
    for key in event['records']:
        print('Key:', key)
        # Iterate through records
        for record in event['records'][key]:
            print('Record:', record)
            # Decode base64
            msg = base64.b64decode(record['value']).decode('utf-8')
            print('Message:', msg)
```

------
#### [ Ruby ]

**Kit SDK pour Ruby**  
 Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’[exemples sans serveur](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda). 
Consommation d’un événement Amazon MSK avec Lambda en utilisant Ruby.  

```
require 'base64'

def lambda_handler(event:, context:)
  # Iterate through keys
  event['records'].each do |key, records|
    puts "Key: #{key}"

    # Iterate through records
    records.each do |record|
      puts "Record: #{record}"

      # Decode base64
      msg = Base64.decode64(record['value'])
      puts "Message: #{msg}"
    end
  end
end
```

------
#### [ Rust ]

**SDK pour Rust**  
 Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’[exemples sans serveur](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda). 
Consommation d’un événement Amazon MSK avec Lambda en utilisant Rust.  

```
use aws_lambda_events::event::kafka::KafkaEvent;
use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent};
use base64::prelude::*;
use serde_json::{Value};
use tracing::{info};

/// Pre-Requisites:
/// 1. Install Cargo Lambda - see https://www.cargo-lambda.info/guide/getting-started.html
/// 2. Add packages tracing, tracing-subscriber, serde_json, base64
///
/// This is the main body for the function.
/// Write your code inside it.
/// There are some code example in the following URLs:
/// - https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/examples
/// - https://github.com/aws-samples/serverless-rust-demo/

async fn function_handler(event: LambdaEvent<KafkaEvent>) -> Result<Value, Error> {

    let payload = event.payload.records;

    for (_name, records) in payload.iter() {

        for record in records {

         let record_text = record.value.as_ref().ok_or("Value is None")?;
         info!("Record: {}", &record_text);

         // perform Base64 decoding
         let record_bytes = BASE64_STANDARD.decode(record_text)?;
         let message = std::str::from_utf8(&record_bytes)?;
         
         info!("Message: {}", message);
        }

    }

    Ok(().into())
}

#[tokio::main]
async fn main() -> Result<(), Error> {

    // required to enable CloudWatch error logging by the runtime
    tracing::init_default_subscriber();
    info!("Setup CW subscriber!");

    run(service_fn(function_handler)).await
}
```

------

Vous pouvez fournir le code de fonction à votre fonction Lambda à l’aide de la console.

**Pour mettre à jour le code de fonction à l’aide de l’éditeur de code de la console**

1. Ouvrez la [page Fonctions](https://console.aws.amazon.com/lambda/home#/functions) de la console Lambda et choisissez votre fonction.

1. Sélectionnez l’onglet **Code**.

1. Dans le volet **Source du code**, sélectionnez votre fichier de code source et modifiez-le dans l’éditeur de code intégré.

1. Dans la section **DÉPLOYER**, choisissez **Déployer** pour mettre à jour le code de votre fonction :  
![\[\]](http://docs.aws.amazon.com/fr_fr/lambda/latest/dg/images/getting-started-tutorial/deploy-console.png)

## Test de votre fonction Lambda pour vérifier qu’elle est connectée à votre rubrique Amazon MSK
<a name="w2aad101c23c15c35c31"></a>

Vous pouvez désormais vérifier si votre Lambda est invoqué par la source d'événements en consultant les journaux d'événements. CloudWatch 

**Pour vérifier si votre fonction Lambda est invoquée**

1. Utilisez votre hôte administrateur Kafka pour générer des événements Kafka à l’aide de la CLI `kafka-console-producer`. Pour plus d’informations, consultez [Write some events into the topic](https://kafka.apache.org/documentation/#quickstart_send) dans la documentation Kafka. Envoyez suffisamment d’événements pour remplir le lot défini en fonction de la taille du lot pour votre mappage des sources d’événements défini à l’étape précédente, sinon Lambda attendra d’autres informations pour procéder à l’invocation.

1. Si votre fonction s'exécute, Lambda écrit ce qui s'est passé à. CloudWatch Dans la console, accédez à la page des détails de votre fonction Lambda.

1. Sélectionnez l’onglet **Configuration**.

1. Dans la barre latérale, sélectionnez **Outils de surveillance et d’exploitation**.

1. Identifiez le **groupe de CloudWatch journaux** sous **Configuration de la journalisation**. Le groupe de journaux doit commencer par `/aws/lambda`. Choisissez le lien vers le groupe de journaux.

1. Dans la CloudWatch console, examinez les **événements du journal pour les événements** du journal que Lambda a envoyés au flux de journal. Identifiez s’il existe des événements de journaux contenant le message de votre événement Kafka, comme dans l’image suivante. Si tel est le cas, vous avez connecté avec succès une fonction Lambda à Amazon MSK à l’aide d’un mappage des sources d’événements Lambda.  
![\[\]](http://docs.aws.amazon.com/fr_fr/lambda/latest/dg/images/msk_tut_log.png)