Tutoriel #2 : Utilisation de filtres pour traiter certains événements avec DynamoDB et Lambda - Amazon DynamoDB

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Tutoriel #2 : Utilisation de filtres pour traiter certains événements avec DynamoDB et Lambda

Dans ce didacticiel, vous allez créer un AWS Lambda déclencheur pour traiter uniquement certains événements d'un flux à partir d'une table DynamoDB.

Avec le filtrage des événements Lambda, vous pouvez utiliser des expressions de filtre pour contrôler les événements envoyés par Lambda à votre fonction pour traitement. Vous pouvez configurer jusqu'à 5 filtres différents par flux DynamoDB. Si vous utilisez des fenêtres de traitement par lots, Lambda applique les critères de filtre à chaque nouvel événement pour déterminer s'il doit être ajouté au lot actuel.

Les filtres sont appliqués via des structures nommées FilterCriteria. Les 3 principaux attributs de FilterCriteria sont metadata properties, data properties et filter patterns.

Voici un exemple de structure d'un événement DynamoDB Streams :

{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }

Les metadata properties correspondent aux champs de l'objet d'événement. Dans le cas de DynamoDB Streams, les metadata properties correspondent à des champs tels que dynamodb ou eventName.

Les data properties correspondent aux champs du corps de l'événement. Pour filtrer sur data properties, veillez à les contenir dans FilterCriteria à l'intérieur de la clé appropriée. Pour les sources d'événements DynamoDB, la clé de données est NewImage ou OldImage.

En dernier lieu, les règles de filtrage définissent l'expression de filtre que vous appliquez à une propriété spécifique. Voici quelques exemples :

Opérateur de comparaison Exemple Syntaxe des règles (partielle)

Null

Type de produit null

{ "product_type": { "S": null } }

Vide

Nom du produit vide

{ "product_name": { "S": [ ""] } }

Égal à

État égal à Floride

{ "state": { "S": ["FL"] } }

And

État du produit égal à Floride et catégorie de produit Chocolat

{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } }

Ou

L'État du produit est la Floride ou la Californie

{ "state": { "S": ["FL","CA"] } }

Pas

L'État du produit n'est pas la Floride

{"state": {"S": [{"anything-but": ["FL"]}]}}

Existe

Le produit fait maison existe

{"homemade": {"S": [{"exists": true}]}}

N’existe pas

Le produit fait maison n'existe pas

{"homemade": {"S": [{"exists": false}]}}

Commence par

PK commence par COMPANY

{"PK": {"S": [{"prefix": "COMPANY"}]}}

Vous pouvez spécifier jusqu'à 5 modèles de filtrage des événements pour une fonction Lambda. Notez que chacun de ces 5 événements sera évalué comme un OR logique. Donc, si vous configurez deux filtres nommés Filter_One et Filter_Two, la fonction Lambda exécutera Filter_One OR Filter_Two.

Note

La page de filtrage des événements Lambda propose certaines options permettant de filtrer et de comparer des valeurs numériques. Toutefois, dans le cas des événements de filtre DynamoDB, cela ne s'applique pas, car les nombres dans DynamoDB sont stockés sous forme de chaînes. Par exemple "quantity": { "N": "50" }, nous savons que c'est un numéro grâce à la propriété "N".

Synthèse – AWS CloudFormation

Pour illustrer la fonctionnalité de filtrage des événements dans la pratique, voici un exemple de CloudFormation modèle. Ce modèle va générer une table DynamoDB simple avec une clé de partition PK et une clé de tri SK avec Amazon DynamoDB Streams activé. Il créera une fonction Lambda et un rôle d'exécution Lambda simple qui permettront d'écrire des journaux dans Amazon Cloudwatch et de lire les événements depuis le flux Amazon DynamoDB. Il ajoutera également le mappage des sources d'événements entre les DynamoDB Streams et la fonction Lambda, afin que la fonction puisse être exécutée chaque fois qu'un événement se produit dans Amazon DynamoDB Stream.

AWSTemplateFormatVersion: "2010-09-09" Description: Sample application that presents AWS Lambda event source filtering with Amazon DynamoDB Streams. Resources: StreamsSampleDDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: "PK" AttributeType: "S" - AttributeName: "SK" AttributeType: "S" KeySchema: - AttributeName: "PK" KeyType: "HASH" - AttributeName: "SK" KeyType: "RANGE" StreamSpecification: StreamViewType: "NEW_AND_OLD_IMAGES" ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 LambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: root PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: arn:aws:logs:*:*:* - Effect: Allow Action: - dynamodb:DescribeStream - dynamodb:GetRecords - dynamodb:GetShardIterator - dynamodb:ListStreams Resource: !GetAtt StreamsSampleDDBTable.StreamArn EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST ProcessEventLambda: Type: AWS::Lambda::Function Properties: Runtime: python3.7 Timeout: 300 Handler: index.handler Role: !GetAtt LambdaExecutionRole.Arn Code: ZipFile: | import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec) Outputs: StreamsSampleDDBTable: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.Arn StreamARN: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.StreamArn

Après avoir déployé ce modèle CloudFormation, vous pouvez insérer l'élément Amazon DynamoDB suivant :

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }

Grâce à la fonction lambda simple incluse en ligne dans ce modèle de formation de cloud, vous verrez les événements relatifs à la fonction lambda dans les groupes de CloudWatch logs Amazon de la manière suivante :

{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }

Exemples de filtrage

  • Uniquement les produits qui correspondent à un État donné

Cet exemple modifie le CloudFormation modèle pour inclure un filtre correspondant à tous les produits provenant de Floride, avec l'abréviation « FL ».

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

Une fois que vous avez redéployé la pile, vous pouvez ajouter l'élément DynamoDB suivant à la table. Notez qu'il n'apparaîtra pas dans les journaux des fonctions Lambda, car le produit de cet exemple provient de Californie.

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK#1000", "company_id": "1000", "fabric": "Florida Chocolates", "price": 15, "product_id": "1000", "quantity": 50, "state": "CA", "stores": 5, "type": "" }
  • Uniquement les éléments qui commencent par certaines valeurs dans PK et SK

Cet exemple modifie le CloudFormation modèle pour inclure la condition suivante :

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

Notez que la AND condition nécessite qu'elle se trouve à l'intérieur du modèle, où les touches PK et SK sont dans la même expression séparées par des virgules.

Soit vous commencez avec des valeurs sur PK et SK, soit le produit provient d'un État donné.

Cet exemple modifie le CloudFormation modèle pour inclure les conditions suivantes :

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

Notez que la condition OR est ajoutée en introduisant de nouveaux modèles dans la section des filtres.

Synthèse – CDK

L'exemple de modèle de formation de CDK projet suivant décrit les fonctionnalités de filtrage des événements. Avant de travailler sur ce CDK projet, vous devez installer les prérequis, notamment exécuter des scripts de préparation.

Création d'un CDK projet

Créez d'abord un nouveau AWS CDK projet en l'invoquant cdk init dans un répertoire vide.

mkdir ddb_filters cd ddb_filters cdk init app --language python

La commande cdk init utilise le nom du dossier du projet pour nommer les différents éléments du projet, notamment les classes, les sous-dossiers et les fichiers. Tous les traits d'union figurant dans le nom du dossier sont convertis en traits de soulignement. Dans le cas contraire, le nom doit prendre la forme d'un identifiant Python. Par exemple, il ne doit pas commencer par un chiffre ni contenir d'espaces.

Pour travailler avec le nouveau projet, activez son environnement virtuel. Cela permet d'installer les dépendances du projet localement dans le dossier du projet, plutôt que globalement.

source .venv/bin/activate python -m pip install -r requirements.txt
Note

Vous pouvez reconnaître qu'il s'agit de la commande Mac/Linux pour activer un environnement virtuel. Les modèles Python incluent un fichier batch, source.bat, qui permet d'utiliser la même commande sous Windows. La commande Windows traditionnelle .venv\Scripts\activate.bat fonctionne également. Si vous avez initialisé votre AWS CDK projet à l'aide de AWS CDK Toolkit v1.70.0 ou d'une version antérieure, votre environnement virtuel se trouve dans le .env répertoire au lieu de. .venv

Infrastructure de base

Ouvrez le fichier ./ddb_filters/ddb_filters_stack.py dans l'éditeur de texte de votre choix. Ce fichier a été généré automatiquement lorsque vous avez créé le AWS CDK projet.

Ensuite, ajoutez les fonctions _create_ddb_table et _set_ddb_trigger_function. Ces fonctions créeront une table DynamoDB avec la clé de partition PK et la clé de tri SK en mode provisionné et mode à la demande), avec Amazon DynamoDB Streams activé par défaut pour afficher les nouvelles et les anciennes images.

La fonction Lambda sera stockée dans le dossier lambda situé sous le fichier app.py. Ce fichier sera créé ultérieurement. Elle inclura une variable d'environnement APP_TABLE_NAME, qui sera le nom de la table Amazon DynamoDB créée par cette pile. Dans la même fonction, nous accorderons des autorisations de lecture de flux à la fonction Lambda. Enfin, elle s'abonnera aux DynamoDB Streams en tant que source d'événements pour la fonction Lambda.

À la fin du fichier de la méthode __init__, vous appellerez les constructions respectives pour les initialiser dans la pile. Pour les projets plus importants qui nécessitent des composants et des services supplémentaires, il peut être préférable de définir ces constructions en dehors de la pile de base.

import os import json import aws_cdk as cdk from aws_cdk import ( Stack, aws_lambda as _lambda, aws_dynamodb as dynamodb, ) from constructs import Construct class DdbFiltersStack(Stack): def _create_ddb_table(self): dynamodb_table = dynamodb.Table( self, "AppTable", partition_key=dynamodb.Attribute( name="PK", type=dynamodb.AttributeType.STRING ), sort_key=dynamodb.Attribute( name="SK", type=dynamodb.AttributeType.STRING), billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST, stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, removal_policy=cdk.RemovalPolicy.DESTROY, ) cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name) return dynamodb_table def _set_ddb_trigger_function(self, ddb_table): events_lambda = _lambda.Function( self, "LambdaHandler", runtime=_lambda.Runtime.PYTHON_3_9, code=_lambda.Code.from_asset("lambda"), handler="app.handler", environment={ "APP_TABLE_NAME": ddb_table.table_name, }, ) ddb_table.grant_stream_read(events_lambda) event_subscription = _lambda.CfnEventSourceMapping( scope=self, id="companyInsertsOnlyEventSourceMapping", function_name=events_lambda.function_name, event_source_arn=ddb_table.table_stream_arn, maximum_batching_window_in_seconds=1, starting_position="LATEST", batch_size=1, ) def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: super().__init__(scope, construct_id, **kwargs) ddb_table = self._create_ddb_table() self._set_ddb_trigger_function(ddb_table)

Nous allons maintenant créer une fonction lambda très simple qui imprimera les journaux sur Amazon CloudWatch. Pour ce faire, créez un nouveau dossier appelé lambda.

mkdir lambda touch app.py

À l'aide de votre éditeur de texte préféré, ajoutez le contenu suivant au fichier app.py :

import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec)

Assurez-vous que vous vous trouvez dans le dossier /ddb_filters/, tapez la commande suivante pour créer l'exemple d'application :

cdk deploy

à un moment donné, il vous sera demandé de confirmer si vous souhaitez déployer la solution. Acceptez les modifications en saisissant Y.

├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤ │ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │ └───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘ Do you wish to deploy these changes (y/n)? y ... ✨ Deployment time: 67.73s Outputs: DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP Stack ARN: arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6

Une fois les modifications déployées, ouvrez votre AWS console et ajoutez un élément à votre tableau.

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }

Les CloudWatch journaux devraient désormais contenir toutes les informations de cette entrée.

Exemples de filtrage

  • Uniquement les produits qui correspondent à un État donné

Ouvrez le fichier ddb_filters/ddb_filters/ddb_filters_stack.py et modifiez-le pour inclure le filtre correspondant à tous les produits égaux à « FL ». Cela peut être révisé juste sous event_subscription, à la ligne 45.

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )
  • Uniquement les éléments qui commencent par certaines valeurs dans PK et SK

Modifiez le script Python de façon à inclure la condition suivante :

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, ] },
  • Soit vous commencez avec des valeurs sur PK et SK, soit le produit provient d'un État donné.

Modifiez le script Python de façon à inclure les conditions suivantes :

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )

Notez que la condition OR est ajoutée en ajoutant d'autres éléments au tableau de filtres.

Nettoyage

Localisez la pile de filtres dans la base de votre répertoire de travail et exécutez cdk destroy. Il vous sera demandé de confirmer la suppression de la ressource :

cdk destroy Are you sure you want to delete: DdbFiltersStack (y/n)? y