Utilisation du filtrage des événements avec une source d’événement Kinesis - AWS Lambda

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utilisation du filtrage des événements avec une source d’événement Kinesis

Vous pouvez utiliser le filtrage d’événements pour contrôler les enregistrements d’un flux ou d’une file d’attente que Lambda envoie à votre fonction. Pour obtenir des informations générales sur le fonctionnement du filtrage des événements, consultez Contrôle des événements envoyés par Lambda à votre fonction.

Cette section porte sur le filtrage des événements pour les sources Kinesis.

Notions de base du filtrage des événements Kinesis

Supposons qu’un producteur insère des données au format JSON dans votre flux de données Kinesis. Un exemple d’enregistrement ressemblerait à ce qui suit, avec les données JSON converties en chaîne encodée en Base64 dans le champ data.

{ "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "eyJSZWNvcmROdW1iZXIiOiAiMDAwMSIsICJUaW1lU3RhbXAiOiAieXl5eS1tbS1kZFRoaDptbTpzcyIsICJSZXF1ZXN0Q29kZSI6ICJBQUFBIn0=", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }

Tant que les données insérées dans le flux par le producteur sont des données JSON valides, vous pouvez utiliser le filtrage d’événements pour filtrer les enregistrements à l’aide de la clé data. Supposons qu’un producteur insère des enregistrements dans votre flux Kinesis au format JSON suivant.

{ "record": 12345, "order": { "type": "buy", "stock": "ANYCO", "quantity": 1000 } }

Pour filtrer uniquement les enregistrements dont le type d’ordre est « buy », l’objet FilterCriteria serait le suivant.

{ "Filters": [ { "Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }" } ] }

Pour plus de clarté, voici la valeur du Pattern de filtre étendu en JSON simple :

{ "data": { "order": { "type": [ "buy" ] } } }

Vous pouvez ajouter votre filtre à l'aide de la console AWS CLI ou d'un AWS SAM modèle.

Console

Pour ajouter ce filtre à l’aide de la console, suivez les instructions de Attacher des critères de filtre à un mappage de sources d’événements (console) et saisissez la chaîne suivante comme critère de filtre.

{ "data" : { "order" : { "type" : [ "buy" ] } } }
AWS CLI

Pour créer un nouveau mappage de source d'événements avec ces critères de filtre à l'aide de AWS Command Line Interface (AWS CLI), exécutez la commande suivante.

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/my-stream \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'

Pour ajouter ces critères de filtre à un mappage des sources d’événements existant, exécutez la commande suivante.

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
AWS SAM

Pour ajouter ce filtre AWS SAM, ajoutez l'extrait suivant au modèle YAML de votre source d'événement.

FilterCriteria: Filters: - Pattern: '{ "data" : { "order" : { "type" : [ "buy" ] } } }'

Pour filtrer correctement les événements provenant de sources Kinesis, le champ de données et vos critères de filtre pour le champ de données doivent être au format JSON valide. Si l’un ou l’autre des champs n’est pas dans un format JSON valide, Lambda rejette le message ou lance une exception. Le tableau suivant résume le comportement spécifique :

Format des données entrantes Pas de modèle de filtre pour les propriétés des données Action obtenue.

JSON valide

JSON valide

Lambda filtre en fonction de vos critères de filtre.

JSON valide

Pas de modèle de filtre pour les propriétés des données

Lambda filtre (uniquement selon les autres propriétés de métadonnées) en fonction de vos critères de filtre.

JSON valide

Non JSON

Lambda lance une exception au moment de la création ou de la mise à jour du mappage de sources d’événements. Le modèle de filtre des propriétés de données doit être au format JSON valide.

Non JSON

JSON valide

Lambda rejette l’enregistrement.

Non JSON

Pas de modèle de filtre pour les propriétés des données

Lambda filtre (uniquement selon les autres propriétés de métadonnées) en fonction de vos critères de filtre.

Non JSON

Non JSON

Lambda lance une exception au moment de la création ou de la mise à jour du mappage de sources d’événements. Le modèle de filtre des propriétés de données doit être au format JSON valide.

Filtrage des enregistrements agrégés de Kinesis

Avec Kinesis, vous pouvez agréger plusieurs enregistrements en un seul enregistrement Kinesis Data Streams pour augmenter le débit de vos données. Lambda ne peut appliquer des critères de filtrage aux enregistrements agrégés que lorsque vous utilisez la distribution ramifiée améliorée de Kinesis. Le filtrage des enregistrements agrégés avec Kinesis standard n’est pas pris en charge. Lorsque vous utilisez la distribution ramifiée améliorée, vous configurez un consommateur Kinesis à débit dédié pour qu’il serve de déclencheur à votre fonction Lambda. Lambda filtre alors les enregistrements agrégés et ne transmet que les enregistrements qui répondent à vos critères de filtrage.

Pour en savoir plus sur l’agrégation d’enregistrements Kinesis, reportez-vous à la section Agrégation sur la page Concepts clés de Kinesis Producer Library (KPL). Pour en savoir plus sur l'utilisation de Lambda avec la fonction de ventilation améliorée de Kinesis, consultez la section Augmenter les performances de traitement des flux en temps réel grâce à la fonction de ventilation améliorée Amazon Kinesis Data Streams et à Lambda sur le blog consacré au calcul. AWS AWS