Traitement des enregistrements Amazon Kinesis Data Streams avec Lambda - AWS Lambda

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Traitement des enregistrements Amazon Kinesis Data Streams avec Lambda

Pour traiter les enregistrements Amazon Kinesis Data Streams avec Lambda, créez un consommateur pour votre flux, puis créez un mappage des sources d’événements Lambda.

Configuration de votre fonction et de votre flux de données

Votre fonction Lambda est une application consommateur pour votre flux de données. Elle traite un lot d’enregistrements à la fois à partir de chaque partition. Vous pouvez mapper une fonction Lambda à un consommateur à débit partagé (itérateur standard) ou à un consommateur à débit dédié avec diffusion améliorée.

  • Itérateur standard : Lambda interroge chaque partition de votre flux Kinesis afin d’obtenir des enregistrements à une fréquence de base d’une fois par seconde. Lorsque d’autres enregistrements sont disponibles, Lambda continue de traiter les lots jusqu’à ce que la fonction rattrape le flux. Le mappage de source d’événement partage le débit de lecture avec d’autres utilisateurs de la partition.

  • Diffusion améliorée : pour réduire la latence et optimiser le débit en lecture, créez un consommateur de flux de données avec diffusion améliorée. Les consommateurs avec diffusion améliorée obtiennent une connexion dédiée pour chaque partition qui n’a pas d’impact sur les autres applications lisant sur le flux. Les consommateurs de flux utilisent HTTP/2 afin de réduire la latence en transférant les enregistrements à Lambda via une connexion longue durée et en comprimant les en-têtes de requête. Vous pouvez créer un consommateur de flux avec l’API RegisterStreamConsumer de Kinesis.

aws kinesis register-stream-consumer \ --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

Vous devriez voir la sortie suivante:

{
    "Consumer": {
        "ConsumerName": "con1",
        "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608",
        "ConsumerStatus": "CREATING",
        "ConsumerCreationTimestamp": 1540591608.0
    }
}

Pour augmenter la vitesse à laquelle votre fonction traite les enregistrements, ajoutez des partitions à votre flux de données. Lambda traite les enregistrements de chaque partition dans l’ordre. Il arrête de traiter les enregistrements supplémentaires d’une partition si votre fonction renvoie une erreur. Plus de partitions signifient plus de lots traités en une seule fois, ce qui réduit l’impact des erreurs sur la simultanéité.

Si votre fonction ne peut pas augmenter sa capacité pour traiter le nombre total de lots simultanés, demandez une augmentation de quota ou réservez de la simultanéité pour votre fonction.

Création d’un mappage des sources d’événements pour invoquer une fonction Lambda

Pour invoquer votre fonction Lambda avec des enregistrements provenant de votre flux de données, créez un mappage de sources d’événements. Vous pouvez créer plusieurs mappages de source d’événement pour traiter les mêmes données avec plusieurs fonctions Lambda, ou pour traiter des éléments en provenance de plusieurs flux de données avec une seule fonction. Lorsque vous traitez des éléments à partir de plusieurs flux, chaque lot ne contient que des enregistrements provenant d’une seule partition ou d’un seul flux.

Vous pouvez configurer des mappages de sources d’événements pour traiter les enregistrements d’un flux dans un autre Compte AWS. Pour en savoir plus, consultez Création d’un mappage de sources d’événements entre comptes.

Avant de créer un mappage de sources d’événements, vous devez autoriser votre fonction Lambda à lire à partir d’un flux de données Kinesis. Lambda a besoin des autorisations suivantes pour gérer les ressources liées à votre flux de données Kinesis :

La politique AWSLambdaKinesisExecutionRole gérée par AWS inclut ces autorisations. Ajoutez cette politique gérée à votre fonction comme décrit dans la procédure suivante.

AWS Management Console
Pour ajouter des autorisations Kinesis à votre fonction
  1. Ouvrez la page Fonctions de la console Lambda et choisissez votre fonction.

  2. Sous l’onglet Configuration, sélectionnez Autorisations.

  3. Dans le volet Rôle d’exécution, sous Nom du rôle, choisissez le lien vers le rôle d’exécution de votre fonction. Ce lien ouvre la page de ce rôle dans la console IAM.

  4. Dans le volet Politiques d’autorisations, choisissez Ajouter des autorisations, puis sélectionnez Attacher des politiques.

  5. Dans le champ de recherche, entrez AWSLambdaKinesisExecutionRole.

  6. Cochez la case en regard de la politique, puis choisissez Ajouter une autorisation.

AWS CLI
Pour ajouter des autorisations Kinesis à votre fonction
  • Exécutez la commande de la CLI suivante pour ajouter la politique AWSLambdaKinesisExecutionRole au rôle d’exécution de votre fonction :

    aws iam attach-role-policy \ --role-name MyFunctionRole \ --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
AWS SAM
Pour ajouter des autorisations Kinesis à votre fonction
  • Dans la définition de votre fonction, ajoutez la propriété Policies comme indiqué dans l’exemple ci-dessous :

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs22.x Policies: - AWSLambdaKinesisExecutionRole

Après avoir configuré les autorisations requises, créez le mappage des sources d’événements.

AWS Management Console
Pour créer le mappage des sources d’événements Kinesis
  1. Ouvrez la page Fonctions de la console Lambda et choisissez votre fonction.

  2. Dans le volet de Présentation de la fonction, choisissez Ajouter un déclencheur.

  3. Sous Configuration du déclencheur, pour la source, sélectionnez Kinesis.

  4. Sélectionnez le flux Kinesis pour lequel vous souhaitez créer le mappage des sources d’événements et, éventuellement, un consommateur de votre flux.

  5. (Facultatif) Modifiez la Taille de lot, la Position de départ et la Fenêtre de traitement par lot de votre mappage des sources d’événements.

  6. Choisissez Ajouter.

Lorsque vous créez votre mappage des sources d’événements à partir de la console, votre rôle IAM doit disposer des autorisations kinesis:ListStreams et kinesis:ListStreamConsumers.

AWS CLI
Pour créer le mappage des sources d’événements Kinesis
  • Exécutez la commande de la CLI suivante pour créer un mappage des sources d’événements Kinesis. Choisissez votre propre taille de lot et votre position de départ en fonction de votre cas d’utilisation.

    aws lambda create-event-source-mapping \ --function-name MyFunction \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --starting-position LATEST \ --batch-size 100

Pour spécifier une fenêtre de traitement par lot, ajoutez l’option --maximum-batching-window-in-seconds. Pour plus d’informations sur ce paramètre et d’autres, consultez create-event-source-mapping dans la Référence des commandes de l’AWS CLI.

AWS SAM
Pour créer le mappage des sources d’événements Kinesis
  • Dans la définition de votre fonction, ajoutez la propriété KinesisEvent comme indiqué dans l’exemple ci-dessous :

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs22.x Policies: - AWSLambdaKinesisExecutionRole Events: KinesisEvent: Type: Kinesis Properties: Stream: !GetAtt MyKinesisStream.Arn StartingPosition: LATEST BatchSize: 100 MyKinesisStream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1

Pour en savoir plus sur la création d’un mappage des sources d’événements pour Kinesis Data Streams dans AWS SAM, consultez Kinesis dans le guide du développeur AWS Serverless Application Model.

Position de départ du sondage et du stream

Sachez que l’interrogation des flux lors des mises à jour et de la création du mappage des sources d’événements est finalement cohérente.

  • Lors de la création du mappage des sources d’événements, le démarrage de l’interrogation des événements depuis le flux peut prendre plusieurs minutes.

  • Lors des mises à jour du mappage des sources d’événements, l’arrêt et le redémarrage de l’interrogation des événements depuis le flux peuvent prendre plusieurs minutes.

Ce comportement signifie que si vous spécifiez LATEST comme position de départ du flux, le mappage des sources d’événements peut manquer des événements lors de la création ou des mises à jour. Pour vous assurer de ne manquer aucun événement, spécifiez la position de départ du flux comme TRIM_HORIZON ou AT_TIMESTAMP.

Création d’un mappage de sources d’événements entre comptes

Amazon Kinesis Data Streams prend en charge les politiques basées sur les ressources. De ce fait, vous pouvez traiter les données ingérées dans un flux dans un Compte AWS à l’aide d’une fonction Lambda dans un autre compte.

Pour créer un mappage de sources d’événements pour votre fonction Lambda à l’aide d’un flux Kinesis dans un autre Compte AWS, vous devez configurer le flux à l’aide d’une politique basée sur les ressources afin d’autoriser votre fonction Lambda à lire des éléments. Pour savoir comment configurer votre flux de manière à autoriser l’accès intercomptes, consultez la rubrique Sharing access with cross-account AWS Lambda functions du guide du développeur Amazon Kinesis Streams.

Une fois que vous avez configuré votre flux avec une politique basée sur les ressources qui donne à votre fonction Lambda les autorisations requises, créez le mappage des sources d’événements à l’aide de l’une des méthodes décrites dans la section précédente.

Si vous choisissez de créer votre mappage des sources d’événements à l’aide de la console Lambda, collez l’ARN de votre flux directement dans la zone de saisie. Si vous souhaitez spécifier un consommateur pour votre flux, le champ du flux est automatiquement rempli lorsque l’ARN du consommateur est collé.