Prétraitement des données à l’aide d’une fonction Lambda - Guide du développeur d'Amazon Kinesis Data Analytics SQL pour applications

Pour les nouveaux projets, nous vous recommandons d'utiliser le nouveau service géré pour Apache Flink Studio plutôt que Kinesis Data Analytics SQL for Applications. Le service géré pour Apache Flink Studio allie facilité d’utilisation et capacités analytiques avancées, ce qui vous permet de créer des applications sophistiquées de traitement des flux en quelques minutes.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Prétraitement des données à l’aide d’une fonction Lambda

Note

Après le 12 septembre 2023, vous ne pourrez plus créer de nouvelles applications en utilisant Kinesis Data Firehose comme source si vous n’utilisez pas déjà Kinesis Data Analytics pour SQL. Pour plus d’informations, consultez Limites .

Si les données de votre flux nécessitent une conversion de format, une transformation, un enrichissement ou un filtrage, vous pouvez les prétraiter à l'aide d'une AWS Lambda fonction. Vous pouvez effectuer cette opération avant que le code SQL de votre application s'exécute ou avant que votre application crée un schéma à partir de votre flux de données.

L’utilisation d’une fonction Lambda de prétraitement des enregistrements est utile dans les cas suivants :

  • Transformation d’enregistrements à partir d’autres formats (comme KPL ou GZIP) en formats pouvant être analysés par Kinesis Data Analytics. Kinesis Data Analytics prend actuellement en charge les formats de données JSON ou CSV.

  • Développement des données dans un format qui est plus accessible pour des opérations telles que le regroupement ou la détection des anomalies. Par exemple, si plusieurs valeurs de données sont stockées ensemble dans une chaîne, vous pouvez développer les données en colonnes distinctes.

  • Enrichissement des données avec d’autres services Amazon, comme l’extrapolation ou la correction d’erreurs.

  • Application de transformation de chaîne complexe à des champs d'enregistrement.

  • Filtrage de données pour nettoyer les données.

L'utilisation d'une fonction Lambda pour le prétraitement des enregistrements

Lorsque vous créez votre application Kinesis Data Analytics, vous activez le prétraitement Lambda sur la page Se connecter à une source.

Pour utiliser une fonction Lambda pour prétraiter des enregistrements dans une application Kinesis Data Analytics
  1. Connectez-vous à la console Managed Service for Apache Flink AWS Management Console et ouvrez-la à l'adresse https://console.aws.amazon.com/kinesisanalytics.

  2. Sur la page Se connecter à une source pour votre application, choisissez Activé dans la section Prétraitement d’enregistrements avec AWS Lambda.

  3. Pour utiliser une fonction Lambda que vous avez déjà créée, choisissez la fonction dans la liste déroulante Fonction Lambda.

  4. Pour créer une nouvelle fonction Lambda à partir de l’un des modèles de prétraitement Lambda, choisissez le modèle dans la liste déroulante. Ensuite, choisissez Afficher <nom_modèle> dans Lambda pour modifier la fonction.

  5. Pour créer une nouvelle fonction Lambda, choisissez Créer. Pour plus d'informations sur la création d'une fonction Lambda, consultez les sections Créer une fonction HelloWorld Lambda et Explorez la console dans le manuel du développeur.AWS Lambda

  6. Choisissez la version de la fonction Lambda à utiliser. Pour utiliser la dernière version, choisissez $LATEST.

Lorsque vous choisissez ou créez une fonction Lambda pour le prétraitement d’enregistrements, les enregistrements sont prétraités avant que le code SQL de votre application ne s’exécute ou que votre application ne génère un schéma à partir des enregistrements.

Autorisations de prétraitement Lambda

Pour utiliser le prétraitement Lambda, le rôle IAM de l’application a besoin de la stratégie d’autorisations suivante :

{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }

Métriques de prétraitement Lambda

Vous pouvez utiliser Amazon CloudWatch pour surveiller le nombre d'appels Lambda, le nombre d'octets traités, les réussites et les échecs, etc. Pour plus d'informations sur CloudWatch les métriques émises par le prétraitement Lambda de Kinesis Data Analytics, consultez Amazon Kinesis Analytics Metrics.

Utilisation AWS Lambda avec la bibliothèque Kinesis Producer

La bibliothèque producteur Kinesis (KPL) regroupe de petits enregistrements formatés par l'utilisateur en enregistrements plus volumineux allant jusqu'à 1 Mo afin de mieux utiliser le débit Amazon Kinesis Data Streams. La bibliothèque Kinesis Client Library (KCL) pour Java prend en charge la désagrégation de ces enregistrements. Cependant, vous devez utiliser un module spécial pour désagréger les enregistrements lorsque vous les utilisez AWS Lambda en tant que consommateur de vos streams.

Pour obtenir le code de projet et les instructions nécessaires, consultez les modules de désagrégation de la bibliothèque Kinesis Producer pour plus d'informations. AWS LambdaGitHub Vous pouvez utiliser les composants de ce projet pour traiter des données sérialisées KPL AWS Lambda dans Java, Node.js et Python. Vous pouvez également utiliser ces composants dans le cadre d’une application KCL multi-lang.

Modèle de données d'entrée d'événement modèle de réponse d'enregistrement pour le prétraitement des données

Pour prétraiter des enregistrements, votre fonction Lambda doit être conforme aux modèles de données d’entrée d’événement et de réponse d’enregistrement imposés.

Modèle de données d'entrée d'événement

Kinesis Data Analytics lit en permanence les données de votre flux de données Kinesis ou de votre flux de diffusion Firehose. Pour chaque lot d’enregistrements qu’il récupère, le service gère la manière dont chaque lot est transmis à votre fonction Lambda. Votre fonction reçoit une liste d'enregistrements en entrée. Au sein de votre fonction, vous effectuez une itération dans la liste et vous appliquez votre logique métier pour réaliser vos exigences de prétraitement (par exemple, conversion du format de données ou enrichissement).

Le modèle d'entrée de votre fonction de prétraitement varie légèrement selon que les données proviennent d'un flux de données Kinesis ou d'un flux de diffusion Firehose.

Si la source est un flux de diffusion Firehose, le modèle de données d'entrée des événements est le suivant :

Modèle de données de demande Kinesis Data Firehose

Champ Description
invocationId ID d’appel Lambda (GUID aléatoire).
applicationArn Amazon Resource Name (ARN) de l’application Kinesis Data Analytics
streamArn ARN du flux de diffusion
enregistrements
Champ Description
recordId ID d'enregistrement (GUID aléatoire)
kinesisFirehoseRecordMetadata
Champ Description
approximateArrivalTimestamp Heure d'arrivée approximative de l'enregistrement de flux de diffusion
data Charge utile d'enregistrement source codée en base64

L'exemple suivant montre l'entrée d'un flux de diffusion Firehose :

{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }

Si la source est un flux de données Kinesis, le modèle de données d’entrée d’événement se présente comme suit :

Modèle de données de demande de flux Kinesis.

Champ Description
invocationId ID d’appel Lambda (GUID aléatoire).
applicationArn ARN de l’application Kinesis Data Analytics
streamArn ARN du flux de diffusion
enregistrements
Champ Description
recordId ID d'enregistrement basé sur le numéro de séquence d'enregistrement Kinesis
kinesisStreamRecordMetadata
Champ Description
sequenceNumber Numéro de séquence de l'enregistrement de flux Kinesis
partitionKey Clé de partition de l'enregistrement de flux Kinesis
shardId ShardId de l'enregistrement de flux Kinesis
approximateArrivalTimestamp Heure d'arrivée approximative de l'enregistrement de flux de diffusion
data Charge utile d'enregistrement source codée en base64

L'exemple suivant montre l'entrée d'un flux de données Kinesis :

{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }

Modèle de réponse d'enregistrement

Tous les enregistrements renvoyés à partir de votre fonction de prétraitement Lambda (avec les ID d'enregistrement) qui sont envoyés à la fonction Lambda doit être renvoyés. Ils doivent contenir les paramètres suivants. Sinon, Kinesis Data Analytics les rejette et les traite comme un échec de prétraitement des données. La partie charge utile des données de l'enregistrement peut être transformée pour réaliser les exigences de prétraitement.

Modèle de données de réponse

enregistrements
Champ Description
recordId L’ID d’enregistrement est transmis depuis Kinesis Data Analytics vers Lambda pendant l’invocation. L'enregistrement transformé doit comporter le même ID d'enregistrement. La moindre incohérence entre l'ID de l'enregistrement initial et l'ID de l'enregistrement transformé est traitée comme un échec du prétraitement des données.
result État de la transformation de données de l'enregistrement. Les valeurs possibles sont :
  • Ok : L’enregistrement a été transformé avec succès. Kinesis Data Analytics reçoit l’enregistrement pour le traitement SQL.

  • Dropped : L’enregistrement a été supprimé volontairement par votre logique de traitement. Kinesis Data Analytics supprime l’enregistrement du traitement SQL. Le champ de charge utile de données est facultatif pour un enregistrement Dropped.

  • ProcessingFailed : L’enregistrement n’a pas pu être transformé. Kinesis Data Analytics le considère comme non traité avec succès par votre fonction Lambda et écrit une erreur dans le flux d’erreur. Pour plus d'informations sur le flux d'erreur, consultez Gestion des erreurs. Le champ de charge utile de données est facultatif pour un enregistrement ProcessingFailed.

data Charge utile des données transformées, d'après l'encodage en base64. Chaque charge utile de données peut contenir plusieurs documents JSON si le format de données d'ingestion de l'application est JSON. Ou chaque charge utile de données peut contenir plusieurs lignes CSV (avec un délimiteur de ligne indiqué dans chaque ligne) si le format de données d'ingestion de l'application est CSV. Le service Kinesis Data Analytics analyse et traite les données correctement avec plusieurs documents JSON ou lignes CSV dans la même charge utile de données.

L'exemple suivant montre la sortie d'une fonction Lambda :

{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }

Échecs courants du prétraitement des données

Voici les raisons courantes pour lesquelles le prétraitement peut échouer.

  • Tous les enregistrements (avec ID d’enregistrement) d’un lot qui sont envoyés à la fonction Lambda ne sont pas renvoyés au service Kinesis Data Analytics.

  • L'ID d'enregistrement ou le champ de charge utile de données est manquant dans la réponse. Le champ de charge utile de données est facultatif pour un enregistrement Dropped ou ProcessingFailed.

  • Les délais d’expiration de la fonction Lambda ne sont pas suffisants pour prétraiter les données.

  • La réponse de la fonction Lambda dépasse les limites de réponse imposées par le service AWS Lambda .

En cas d’échec de prétraitement des données, Kinesis Data Analytics continue de relancer les invocations Lambda sur le même ensemble d’enregistrements jusqu’à ce que cela aboutisse. Vous pouvez surveiller les CloudWatch indicateurs suivants pour mieux comprendre les défaillances.

  • MillisBehindLatest de l’application Kinesis Data Analytics : indique le retard d’une application pour la lecture de la source de streaming.

  • Indicateurs de InputPreprocessing CloudWatch l'application Kinesis Data Analytics : indiquent le nombre de réussites et d'échecs, entre autres statistiques. Pour plus d'informations, consultez Métriques Amazon Kinesis Analytics.

  • AWS Lambda CloudWatch métriques et journaux des fonctions.