Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Vous pouvez utiliser une fonction Lambda pour traiter des enregistrements dans un flux de données Amazon Kinesis. Vous pouvez mapper une fonction Lambda à un consommateur à débit partagé (itérateur standard) Kinesis Data Streams ou à un consommateur à débit dédié avec diffusion améliorée. Pour les itérateurs standard, Lambda interroge chaque partition de votre flux Kinesis pour des enregistrements qui utilisent le protocole HTTP. Le mappage de source d’événement partage le débit de lecture avec d’autres utilisateurs de la partition.
Pour plus d’informations sur les flux de données Kinesis, consultez Lecture de données à partir d’Amazon Kinesis Data Streams.
Note
Kinesis facture chaque partition et, pour les diffusions améliorées, les données lues à partir du flux. Pour obtenir des informations de tarification, consultez Tarification Amazon Kinesis
Rubriques
Traitement des enregistrements Amazon Kinesis Data Streams avec Lambda
Configuration d’une réponse par lots partielle avec Kinesis Data Streams et Lambda
Implémentation du traitement des flux de données Kinesis avec état dans Lambda
Paramètres Lambda pour les mappages des sources d’événements Amazon Kinesis Data Streams
Utilisation du filtrage des événements avec une source d’événement Kinesis
Tutoriel : Utilisation de Lambda avec les flux de données Kinesis
Flux d’interrogation et de mise en lots
Lambda lit les enregistrements du flux de données et invoque votre fonction de manière synchrone avec un événement contenant des enregistrements de flux. Lambda lit les registres par lots et invoque votre fonction pour les traiter. Chaque lot contient des enregistrements provenant d’un seul flux de données/partition.
Pour les flux de données Kinesis standard, Lambda interroge les partitions de votre flux afin d’obtenir des enregistrements à une fréquence d’une fois par seconde pour chaque partition. Pour une diffusion améliorée Kinesis, Lambda utilise une connexion HTTP/2 pour écouter les enregistrements envoyés à partir de Kinesis. Lorsque des enregistrements sont disponibles, Lambda invoque votre fonction et attend le résultat.
Par défaut, Lambda invoque votre fonction dès que des enregistrements sont disponibles. Si le lot que Lambda lit à partir de la source d’événements ne comprend qu’un seul enregistrement, Lambda envoie un seul registre à la fonction. Pour éviter d’invoquer la fonction avec un petit nombre de registres, vous pouvez indiquer à la source d’événement de les mettre en mémoire tampon pendant 5 minutes en configurant une fenêtre de traitement par lots. Avant d’invoquer la fonction, Lambda continue de lire les registres de la source d’événements jusqu’à ce qu’il ait rassemblé un lot complet, que la fenêtre de traitement par lot expire ou que le lot atteigne la limite de charge utile de 6 Mo. Pour de plus amples informations, veuillez consulter Comportement de traitement par lots.
Avertissement
Les mappages des sources d’événements Lambda traitent chaque événement au moins une fois, et le traitement des enregistrements peut être dupliqué. Pour éviter les problèmes potentiels liés à des événements dupliqués, nous vous recommandons vivement de rendre votre code de fonction idempotent. Pour en savoir plus, consultez Comment rendre ma fonction Lambda idempotente
Lambda envoi le prochain lot pour traitement sans attendre que les extensions configurées soient terminées. En d’autres termes, vos extensions peuvent continuer à s’exécuter pendant que Lambda traite le prochain lot d’enregistrements. Cela peut causer des problèmes de limitation si vous enfreignez l’un des paramètres ou l’une des limites de simultanéité de votre compte. Pour détecter s’il s’agit d’un problème éventuel, surveillez vos fonctions et vérifiez si vous observez des métriques de simultanéité plus élevées que prévu pour votre mappage des sources d’événements. En raison de la brièveté des intervalles entre les invocations, Lambda peut brièvement signaler une utilisation simultanée supérieure au nombre de partitions. Cela peut être vrai même pour les fonctions Lambda sans extensions.
Configurez le ParallelizationFactorparamètre pour traiter une partition d'un flux de données Kinesis avec plusieurs invocations Lambda simultanément. Vous pouvez spécifier le nombre de lots simultanés que Lambda interroge à partir d’une partition via un facteur de parallélisation de 1 (par défaut) à 10. Par exemple, quand vous définissez ParallelizationFactor
sur 2, vous pouvez avoir jusqu’à 200 invocations Lambda simultanés pour traiter 100 partitions de données Kinesis (bien que, dans la réalité, la métrique ConcurrentExecutions
puisse indiquer une valeur différente). Cela permet d’augmenter le débit de traitement quand le volume de données est volatil et que la valeur du paramètre IteratorAge
est élevée. Lorsque vous augmentez le nombre de lots simultanés par partition, Lambda assure toujours un traitement dans l’ordre au niveau clé de partition.
Vous pouvez également utiliser ParallelizationFactor
avec l’agrégation Kinesis. Le comportement du mappage des sources d’événements varie selon que vous utilisez ou non une diffusion améliorée :
-
Sans diffusion améliorée : tous les événements d’un événement agrégé doivent avoir la même clé de partition. La clé de partition doit également correspondre à celle de l’événement agrégé. Si les événements contenus dans l’événement agrégé ont des clés de partition différentes, Lambda ne peut garantir le traitement dans l’ordre des événements par clé de partition.
-
Avec une diffusion améliorée : Lambda décode d’abord l’événement agrégé en événements individuels. L’événement agrégé peut avoir une clé de partition différente de celle des événements qu’il contient. Cependant, les événements qui ne correspondent pas à la clé de partition sont supprimés et perdus
. Lambda ne traite pas ces événements et ne les envoie pas vers une destination en cas de panne configurée.
Exemple d’évènement
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"approximateArrivalTimestamp": 1545084650.987
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
"data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
"approximateArrivalTimestamp": 1545084711.166
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
}
]
}