Utilisation du filtrage des événements avec une source d'événements Kinesis - AWS Lambda

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utilisation du filtrage des événements avec une source d'événements Kinesis

Vous pouvez utiliser le filtrage d’événements pour contrôler les enregistrements d’un flux ou d’une file d’attente que Lambda envoie à votre fonction. Pour des informations générales sur le fonctionnement du filtrage des événements, consultezContrôlez les événements que Lambda envoie à votre fonction.

Cette section se concentre sur le filtrage des événements pour les sources d'événements Kinesis.

Principes de base du filtrage des événements Kinesis

Supposons qu'un producteur insère des données JSON formatées dans votre flux de données Kinesis. Un exemple d'enregistrement ressemblerait à ce qui suit, avec les JSON données converties en une chaîne codée Base64 dans le data champ.

{ "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "eyJSZWNvcmROdW1iZXIiOiAiMDAwMSIsICJUaW1lU3RhbXAiOiAieXl5eS1tbS1kZFRoaDptbTpzcyIsICJSZXF1ZXN0Q29kZSI6ICJBQUFBIn0=", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }

Tant que les données que le producteur introduit dans le flux sont validesJSON, vous pouvez utiliser le filtrage des événements pour filtrer les enregistrements à l'aide de la data clé. Supposons qu'un producteur insère des enregistrements dans votre flux Kinesis au format suivantJSON.

{ "record": 12345, "order": { "type": "buy", "stock": "ANYCO", "quantity": 1000 } }

Pour filtrer uniquement les enregistrements dont le type d’ordre est « buy », l’objet FilterCriteria serait le suivant.

{ "Filters": [ { "Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }" } ] }

Pour plus de clarté, voici la valeur du filtre Pattern expansé en clairJSON.

{ "data": { "order": { "type": [ "buy" ] } } }

Vous pouvez ajouter votre filtre à l'aide de la console AWS CLI ou d'un AWS SAM modèle.

Console

Pour ajouter ce filtre à l’aide de la console, suivez les instructions de Attacher des critères de filtre à un mappage de sources d’événements (console) et saisissez la chaîne suivante comme critère de filtre.

{ "data" : { "order" : { "type" : [ "buy" ] } } }
AWS CLI

Pour créer un nouveau mappage de source d'événements avec ces critères de filtre à l'aide de AWS Command Line Interface (AWS CLI), exécutez la commande suivante.

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/my-stream \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'

Pour ajouter ces critères de filtre à un mappage des sources d’événements existant, exécutez la commande suivante.

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
AWS SAM

Pour ajouter ce filtre AWS SAM, ajoutez l'extrait suivant au YAML modèle de votre source d'événement.

FilterCriteria: Filters: - Pattern: '{ "data" : { "order" : { "type" : [ "buy" ] } } }'

Pour filtrer correctement les événements provenant des sources Kinesis, le champ de données et vos critères de filtrage pour le champ de données doivent être au format valideJSON. Si l'un des champs n'est pas dans un JSON format valide, Lambda supprime le message ou lance une exception. Le tableau suivant résume le comportement spécifique :

Format des données entrantes Pas de modèle de filtre pour les propriétés des données Action obtenue.

Valide JSON

Valide JSON

Lambda filtre en fonction de vos critères de filtre.

Valide JSON

Pas de modèle de filtre pour les propriétés des données

Lambda filtre (uniquement selon les autres propriétés de métadonnées) en fonction de vos critères de filtre.

Valide JSON

Non- JSON

Lambda lance une exception au moment de la création ou de la mise à jour du mappage de sources d’événements. Le modèle de filtre pour les propriétés des données doit être dans un JSON format valide.

Non- JSON

Valide JSON

Lambda rejette l’enregistrement.

Non- JSON

Pas de modèle de filtre pour les propriétés des données

Lambda filtre (uniquement selon les autres propriétés de métadonnées) en fonction de vos critères de filtre.

Non- JSON

Non- JSON

Lambda lance une exception au moment de la création ou de la mise à jour du mappage de sources d’événements. Le modèle de filtre pour les propriétés des données doit être dans un JSON format valide.

Filtrage des enregistrements agrégés de Kinesis

Avec Kinesis, vous pouvez agréger plusieurs enregistrements en un seul enregistrement Kinesis Data Streams pour augmenter le débit de vos données. Lambda ne peut appliquer des critères de filtrage aux enregistrements agrégés que lorsque vous utilisez la distribution ramifiée améliorée de Kinesis. Le filtrage des enregistrements agrégés avec Kinesis standard n’est pas pris en charge. Lorsque vous utilisez la distribution ramifiée améliorée, vous configurez un consommateur Kinesis à débit dédié pour qu’il serve de déclencheur à votre fonction Lambda. Lambda filtre alors les enregistrements agrégés et ne transmet que les enregistrements qui répondent à vos critères de filtrage.

Pour en savoir plus sur l'agrégation des enregistrements Kinesis, reportez-vous à la section Agrégation de la page Concepts clés de la Kinesis Producer Library (KPL). Pour en savoir plus sur l'utilisation de Lambda avec la fonction de ventilation améliorée de Kinesis, consultez la section Augmenter les performances de traitement des flux en temps réel grâce à la fonction de ventilation améliorée Amazon Kinesis Data Streams et à Lambda sur le blog consacré au calcul. AWS AWS