Tutorial #2: Filter verwenden, um einige Ereignisse mit DynamoDB und Lambda zu verarbeiten - Amazon-DynamoDB

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Tutorial #2: Filter verwenden, um einige Ereignisse mit DynamoDB und Lambda zu verarbeiten

In diesem Tutorial erstellen Sie einen AWS Lambda Trigger, um nur einige Ereignisse in einem Stream aus einer DynamoDB-Tabelle zu verarbeiten.

Über die Lambda-Ereignisfilterung können Sie mithilfe von Filterausdrücken steuern, welche Ereignisse Lambda zur Verarbeitung an Ihre Funktion sendet. Sie können bis zu 5 verschiedene Filter pro DynamoDB-Streams konfigurieren. Wenn Sie Batching-Fenster verwenden, wendet Lambda die Filterkriterien auf jedes neue Ereignis an, um festzustellen, ob es dem aktuellen Batch hinzugefügt werden soll.

Filter werden über Strukturen, sogenannte FilterCriteria, angewendet. Die 3 Hauptattribute von FilterCriteria sind metadata properties, data properties und filter patterns.

Hier ist eine Beispielstruktur eines DynamoDB-Streams-Ereignisses:

{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }

metadata properties sind die Felder des Ereignisobjekts. Im Falle von DynamoDB-Streams sind metadata properties Felder wie dynamodb oder eventName.

data properties sind die Felder des Ereignistexts. Um nach data properties zu filtern, müssen sie in FilterCriteria im richtigen Schlüssel eingeschlossen sein. Für DynamoDB-Ereignisquellen lautet der Datenschlüssel NewImage oder OldImage.

Schließlich definieren Filterregeln den Filterausdruck, den Sie auf eine bestimmte Eigenschaft anwenden möchten. Hier sind einige Beispiele:

Vergleichsoperator Beispiel Regelsyntax (partiell)

Null

Der Produkttyp ist null.

{ "product_type": { "S": null } }

Leer

Der Produktname ist leer.

{ "product_name": { "S": [ ""] } }

Gleichheitszeichen

Der Bundesstaat ist Florida.

{ "state": { "S": ["FL"] } }

And

Der Produktstatus ist Florida und die Produktkategorie „Chocolate“ (Schokolade).

{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } }

Oder

Der Produktstatus ist Florida oder Kalifornien.

{ "state": { "S": ["FL","CA"] } }

Nicht

Der Produktstatus ist nicht Florida.

{"state": {"S": [{"anything-but": ["FL"]}]}}

Vorhanden

Produkt „Homemade“ (Hausgemacht) ist vorhanden.

{"homemade": {"S": [{"exists": true}]}}

Nicht vorhanden

Produkt „Homemade“ (Hausgemacht) ist nicht vorhanden.

{"homemade": {"S": [{"exists": false}]}}

Beginnt mit

PK beginnt mit COMPANY

{"PK": {"S": [{"prefix": "COMPANY"}]}}

Sie können bis zu 5 Ereignisfilterungsmuster für eine Lambda-Funktion angeben. Beachten Sie, dass jedes dieser 5 Ereignisse als logisches ODER ausgewertet wird. Wenn Sie also zwei Filter namens Filter_One und Filter_Two konfigurieren, führt die Lambda-Funktion Filter_One ODER Filter_Two aus.

Anmerkung

Auf der Seite Lambda-Ereignisfilterung sind einige Optionen zum Filtern und Vergleichen numerischer Werte vorhanden. Im Falle von DynamoDB-Filterereignissen gelten diese jedoch nicht, da Zahlen in DynamoDB als Zeichenfolgen gespeichert werden. Bei "quantity": { "N": "50" } beispielsweise wissen wir aufgrund der Eigenschaft "N", dass es sich um eine Zahl handelt.

Zusammenführung – AWS CloudFormation

Um die Funktionalität der Ereignisfilterung in der Praxis zu veranschaulichen, finden Sie hier eine CloudFormation Beispielvorlage. Diese Vorlage generiert eine einfache DynamoDB-Tabelle mit einem Partitionsschlüssel PK und einem Sortierschlüssel SK mit aktiviertem Amazon DynamoDB Streams. Sie erstellt eine Lambda-Funktion und eine einfache Lambda-Ausführungsrolle, die das Schreiben von Protokollen in Amazon Cloudwatch und das Lesen der Ereignisse aus dem Amazon-DynamoDB-Stream ermöglichen. Zudem fügt sie die Ereignisquellenzuordnung zwischen den DynamoDB-Streams und der Lambda-Funktion hinzu, sodass die Funktion bei jedem Ereignis im Amazon-DymamoDB-Stream ausgeführt werden kann.

AWSTemplateFormatVersion: "2010-09-09" Description: Sample application that presents AWS Lambda event source filtering with Amazon DynamoDB Streams. Resources: StreamsSampleDDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: "PK" AttributeType: "S" - AttributeName: "SK" AttributeType: "S" KeySchema: - AttributeName: "PK" KeyType: "HASH" - AttributeName: "SK" KeyType: "RANGE" StreamSpecification: StreamViewType: "NEW_AND_OLD_IMAGES" ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 LambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: root PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: arn:aws:logs:*:*:* - Effect: Allow Action: - dynamodb:DescribeStream - dynamodb:GetRecords - dynamodb:GetShardIterator - dynamodb:ListStreams Resource: !GetAtt StreamsSampleDDBTable.StreamArn EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST ProcessEventLambda: Type: AWS::Lambda::Function Properties: Runtime: python3.7 Timeout: 300 Handler: index.handler Role: !GetAtt LambdaExecutionRole.Arn Code: ZipFile: | import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec) Outputs: StreamsSampleDDBTable: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.Arn StreamARN: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.StreamArn

Nachdem Sie diese Cloud-Formation-Vorlage bereitgestellt haben, können Sie das folgende Amazon-DynamoDB-Element einfügen:

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }

Dank der einfachen Lambda-Funktion, die direkt in dieser Cloud-Formation-Vorlage enthalten ist, sehen Sie die Ereignisse in den CloudWatch Amazon-Protokollgruppen für die Lambda-Funktion wie folgt:

{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }

Filterbeispiele

  • Nur Produkte, die einem bestimmten Bundesstaat entsprechen

In diesem Beispiel wird die CloudFormation Vorlage dahingehend geändert, dass sie einen Filter enthält, der allen Produkten aus Florida mit der Abkürzung „FL“ entspricht.

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

Wenn Sie den Stack erneut bereitstellen, können Sie das folgende DynamoDB-Element zur Tabelle hinzufügen. Beachten Sie, dass es nicht in den Lambda-Funktionsprotokollen angezeigt wird, da das Produkt in diesem Beispiel aus Kalifornien stammt.

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK#1000", "company_id": "1000", "fabric": "Florida Chocolates", "price": 15, "product_id": "1000", "quantity": 50, "state": "CA", "stores": 5, "type": "" }
  • Nur die Elemente, die mit einigen Werten im PK und SK beginnen

In diesem Beispiel wird die CloudFormation Vorlage so geändert, dass sie die folgende Bedingung enthält:

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

Beachten Sie, dass die AND Bedingung erfordert, dass sich die Bedingung innerhalb des Musters befindet, wobei sich die Schlüssel PK und SK in demselben Ausdruck befinden, getrennt durch Komma.

Entweder mit einigen Werten für PK und SK beginnen oder stammt aus bestimmten Bundesstaat.

In diesem Beispiel wird die CloudFormation Vorlage so geändert, dass sie die folgenden Bedingungen enthält:

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

Beachten Sie, dass die OR-Bedingung durch Einführung neuer Muster in den Filterabschnitt hinzugefügt wird.

Zusammenführung – CDK

In der folgenden Beispielvorlage zur CDK Projektgründung werden die Funktionen zur Ereignisfilterung beschrieben. Bevor Sie mit diesem CDK Projekt arbeiten können, müssen Sie die Voraussetzungen, einschließlich der Ausführung von Vorbereitungsskripten, installieren.

Erstellen Sie ein Projekt CDK

Erstellen Sie zunächst ein neues AWS CDK Projekt, indem Sie es cdk init in einem leeren Verzeichnis aufrufen.

mkdir ddb_filters cd ddb_filters cdk init app --language python

Im Befehl cdk init wird der Name des Projektordners zur Benennung verschiedener Elemente des Projekts verwendet, einschließlich Klassen, Unterordnern und Dateien. Bindestriche im Ordnernamen werden in Unterstriche umgewandelt. Ansonsten sollte der Name dem Format eines Python-Bezeichners folgen. Er sollte beispielsweise nicht mit einer Zahl beginnen und keine Leerzeichen enthalten.

Um mit dem neuen Projekt zu arbeiten, aktivieren Sie seine virtuelle Umgebung. Dadurch können die Abhängigkeiten des Projekts lokal im Projektordner installiert werden und müssen nicht global installiert werden.

source .venv/bin/activate python -m pip install -r requirements.txt
Anmerkung

Sie kennen diesen Befehl vielleicht als Mac-/Linux-Befehl zum Aktivieren einer virtuellen Umgebung. Die Python-Vorlagen enthalten eine Batch-Datei, source.bat, die die Verwendung desselben Befehls unter Windows ermöglicht. Der traditionelle Windows-Befehl .venv\Scripts\activate.bat funktioniert ebenfalls. Wenn Sie Ihr AWS CDK Projekt mit AWS CDK Toolkit v1.70.0 oder früher initialisiert haben, befindet sich Ihre virtuelle Umgebung stattdessen im Verzeichnis. .env .venv

Grundlegende Infrastruktur

Öffnen Sie die Datei ./ddb_filters/ddb_filters_stack.py in einem Texteditor Ihrer Wahl. Diese Datei wurde auto generiert, als Sie das AWS CDK Projekt erstellt haben.

Fügen Sie als Nächstes die Funktionen _create_ddb_table und _set_ddb_trigger_function hinzu. Diese Funktionen erstellen eine DynamoDB-Tabelle mit dem Partitionsschlüssel PK und dem Sortierschlüssel SK im Bereitstellungsmodus/On-Demand-Modus, wobei Amazon DynamoDB Streams standardmäßig aktiviert ist, um neue und alte Bilder anzuzeigen.

Die Lambda-Funktion wird im Ordner lambda unter der Datei app.py gespeichert. Diese Datei wird später erstellt. Sie wird eine Umgebungsvariable, APP_TABLE_NAME, enthalten. Hierbei wird es sich um den Namen der Amazon-DynamoDB-Tabelle handeln, die von diesem Stack erstellt wurde. In derselben Funktion werden wir der Lambda-Funktion Stream-Leseberechtigungen erteilen. Schließlich wird sie die DynamoDB Streams als Ereignisquelle für die Lambda-Funktion abonnieren.

Am Ende der Datei, in der __init__-Methode, werden Sie die entsprechenden Konstrukte aufrufen, um sie im Stack zu initialisieren. Bei größeren Projekten, die zusätzliche Komponenten und Services erfordern, ist es möglicherweise am besten, diese Konstrukte außerhalb des Basis-Stacks zu definieren.

import os import json import aws_cdk as cdk from aws_cdk import ( Stack, aws_lambda as _lambda, aws_dynamodb as dynamodb, ) from constructs import Construct class DdbFiltersStack(Stack): def _create_ddb_table(self): dynamodb_table = dynamodb.Table( self, "AppTable", partition_key=dynamodb.Attribute( name="PK", type=dynamodb.AttributeType.STRING ), sort_key=dynamodb.Attribute( name="SK", type=dynamodb.AttributeType.STRING), billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST, stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, removal_policy=cdk.RemovalPolicy.DESTROY, ) cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name) return dynamodb_table def _set_ddb_trigger_function(self, ddb_table): events_lambda = _lambda.Function( self, "LambdaHandler", runtime=_lambda.Runtime.PYTHON_3_9, code=_lambda.Code.from_asset("lambda"), handler="app.handler", environment={ "APP_TABLE_NAME": ddb_table.table_name, }, ) ddb_table.grant_stream_read(events_lambda) event_subscription = _lambda.CfnEventSourceMapping( scope=self, id="companyInsertsOnlyEventSourceMapping", function_name=events_lambda.function_name, event_source_arn=ddb_table.table_stream_arn, maximum_batching_window_in_seconds=1, starting_position="LATEST", batch_size=1, ) def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: super().__init__(scope, construct_id, **kwargs) ddb_table = self._create_ddb_table() self._set_ddb_trigger_function(ddb_table)

Jetzt werden wir eine sehr einfache Lambda-Funktion erstellen, die die Protokolle in Amazon CloudWatch druckt. Erstellen Sie zu diesem Zweck einen neuen Ordner namens lambda.

mkdir lambda touch app.py

Fügen Sie der Datei app.py über Ihren bevorzugten Texteditor den folgenden Inhalt hinzu:

import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec)

Stellen Sie sicher, dass Sie sich im Ordner /ddb_filters/ befinden, und geben Sie zum Erstellen der Beispielanwendung den folgenden Befehl ein:

cdk deploy

Irgendwann werden Sie aufgefordert, zu bestätigen, dass Sie die Lösung bereitstellen möchten. Akzeptieren Sie die Änderungen durch Eingabe von Y.

├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤ │ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │ └───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘ Do you wish to deploy these changes (y/n)? y ... ✨ Deployment time: 67.73s Outputs: DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP Stack ARN: arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6

Sobald die Änderungen bereitgestellt sind, öffnen Sie Ihre AWS Konsole und fügen Sie Ihrer Tabelle ein Element hinzu.

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }

Die CloudWatch Protokolle sollten jetzt alle Informationen aus diesem Eintrag enthalten.

Filterbeispiele

  • Nur Produkte, die einem bestimmten Bundesstaat entsprechen

Öffnen Sie die Datei ddb_filters/ddb_filters/ddb_filters_stack.py und ändern Sie sie so, dass sie den Filter enthält, der alle Produkte abgleicht, die gleich „FL“ sind. Dies kann direkt unter der event_subscription in Zeile 45 geändert werden.

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )
  • Nur die Elemente, die mit einigen Werten im PK und SK beginnen

Ändern Sie das Python-Skript, um die folgende Bedingung aufzunehmen:

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, ] },
  • Entweder mit einigen Werten für PK und SK beginnen oder stammt aus bestimmten Bundesstaat.

Ändern Sie das Python-Skript, um die folgenden Bedingungen aufzunehmen:

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )

Beachten Sie, dass die OR-Bedingung durch Hinzufügen weiterer Elemente zum Filter-Array hinzugefügt wird.

Bereinigen

Suchen Sie den Filter-Stack in der Basis Ihres Arbeitsverzeichnisses und führen Sie cdk destroy aus. Sie werden aufgefordert, das Löschen der Ressource zu bestätigen:

cdk destroy Are you sure you want to delete: DdbFiltersStack (y/n)? y