Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Utilisation du filtrage des événements avec une source d’événement Kinesis
Vous pouvez utiliser le filtrage d’événements pour contrôler les enregistrements d’un flux ou d’une file d’attente que Lambda envoie à votre fonction. Pour obtenir des informations générales sur le fonctionnement du filtrage des événements, consultez Contrôle des événements envoyés par Lambda à votre fonction.
Cette section porte sur le filtrage des événements pour les sources Kinesis.
Rubriques
Notions de base du filtrage des événements Kinesis
Supposons qu’un producteur insère des données au format JSON dans votre flux de données Kinesis. Un exemple d’enregistrement ressemblerait à ce qui suit, avec les données JSON converties en chaîne encodée en Base64 dans le champ data
.
{ "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "eyJSZWNvcmROdW1iZXIiOiAiMDAwMSIsICJUaW1lU3RhbXAiOiAieXl5eS1tbS1kZFRoaDptbTpzcyIsICJSZXF1ZXN0Q29kZSI6ICJBQUFBIn0=", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }
Tant que les données insérées dans le flux par le producteur sont des données JSON valides, vous pouvez utiliser le filtrage d’événements pour filtrer les enregistrements à l’aide de la clé data
. Supposons qu’un producteur insère des enregistrements dans votre flux Kinesis au format JSON suivant.
{ "record": 12345, "order": { "type": "buy", "stock": "ANYCO", "quantity": 1000 } }
Pour filtrer uniquement les enregistrements dont le type d’ordre est « buy », l’objet FilterCriteria
serait le suivant.
{ "Filters": [ { "Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }" } ] }
Pour plus de clarté, voici la valeur du Pattern
de filtre étendu en JSON simple :
{ "data": { "order": { "type": [ "buy" ] } } }
Vous pouvez ajouter votre filtre à l'aide de la console AWS CLI ou d'un AWS SAM modèle.
Pour filtrer correctement les événements provenant de sources Kinesis, le champ de données et vos critères de filtre pour le champ de données doivent être au format JSON valide. Si l’un ou l’autre des champs n’est pas dans un format JSON valide, Lambda rejette le message ou lance une exception. Le tableau suivant résume le comportement spécifique :
Format des données entrantes | Pas de modèle de filtre pour les propriétés des données | Action obtenue. |
---|---|---|
JSON valide |
JSON valide |
Lambda filtre en fonction de vos critères de filtre. |
JSON valide |
Pas de modèle de filtre pour les propriétés des données |
Lambda filtre (uniquement selon les autres propriétés de métadonnées) en fonction de vos critères de filtre. |
JSON valide |
Non JSON |
Lambda lance une exception au moment de la création ou de la mise à jour du mappage de sources d’événements. Le modèle de filtre des propriétés de données doit être au format JSON valide. |
Non JSON |
JSON valide |
Lambda rejette l’enregistrement. |
Non JSON |
Pas de modèle de filtre pour les propriétés des données |
Lambda filtre (uniquement selon les autres propriétés de métadonnées) en fonction de vos critères de filtre. |
Non JSON |
Non JSON |
Lambda lance une exception au moment de la création ou de la mise à jour du mappage de sources d’événements. Le modèle de filtre des propriétés de données doit être au format JSON valide. |
Filtrage des enregistrements agrégés de Kinesis
Avec Kinesis, vous pouvez agréger plusieurs enregistrements en un seul enregistrement Kinesis Data Streams pour augmenter le débit de vos données. Lambda ne peut appliquer des critères de filtrage aux enregistrements agrégés que lorsque vous utilisez la distribution ramifiée améliorée de Kinesis. Le filtrage des enregistrements agrégés avec Kinesis standard n’est pas pris en charge. Lorsque vous utilisez la distribution ramifiée améliorée, vous configurez un consommateur Kinesis à débit dédié pour qu’il serve de déclencheur à votre fonction Lambda. Lambda filtre alors les enregistrements agrégés et ne transmet que les enregistrements qui répondent à vos critères de filtrage.
Pour en savoir plus sur l’agrégation d’enregistrements Kinesis, reportez-vous à la section Agrégation sur la page Concepts clés de Kinesis Producer Library (KPL). Pour en savoir plus sur l'utilisation de Lambda avec la fonction de ventilation améliorée de Kinesis, consultez la section Augmenter les performances de traitement des flux en temps réel grâce à la fonction de ventilation améliorée Amazon Kinesis Data Streams et à Lambda sur le blog