Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Tutorial #2: Filter verwenden, um einige Ereignisse mit DynamoDB und Lambda zu verarbeiten
In diesem Tutorial erstellen Sie einen AWS Lambda Trigger, um nur einige Ereignisse in einem Stream aus einer DynamoDB-Tabelle zu verarbeiten.
Über die Lambda-Ereignisfilterung können Sie mithilfe von Filterausdrücken steuern, welche Ereignisse Lambda zur Verarbeitung an Ihre Funktion sendet. Sie können bis zu 5 verschiedene Filter pro DynamoDB-Streams konfigurieren. Wenn Sie Batching-Fenster verwenden, wendet Lambda die Filterkriterien auf jedes neue Ereignis an, um festzustellen, ob es dem aktuellen Batch hinzugefügt werden soll.
Filter werden über Strukturen, sogenannte FilterCriteria
, angewendet. Die 3 Hauptattribute von FilterCriteria
sind metadata properties
, data properties
und filter patterns
.
Hier ist eine Beispielstruktur eines DynamoDB-Streams-Ereignisses:
{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }
metadata properties
sind die Felder des Ereignisobjekts. Im Falle von DynamoDB-Streams sind metadata properties
Felder wie dynamodb
oder eventName
.
data properties
sind die Felder des Ereignistexts. Um nach data properties
zu filtern, müssen sie in FilterCriteria
im richtigen Schlüssel eingeschlossen sein. Für DynamoDB-Ereignisquellen lautet der Datenschlüssel NewImage
oder OldImage
.
Schließlich definieren Filterregeln den Filterausdruck, den Sie auf eine bestimmte Eigenschaft anwenden möchten. Hier sind einige Beispiele:
Vergleichsoperator | Beispiel | Regelsyntax (partiell) |
---|---|---|
Null |
Der Produkttyp ist null. |
|
Leer |
Der Produktname ist leer. |
|
Gleichheitszeichen |
Der Bundesstaat ist Florida. |
|
And |
Der Produktstatus ist Florida und die Produktkategorie „Chocolate“ (Schokolade). |
|
Oder |
Der Produktstatus ist Florida oder Kalifornien. |
|
Nicht |
Der Produktstatus ist nicht Florida. |
|
Vorhanden |
Produkt „Homemade“ (Hausgemacht) ist vorhanden. |
|
Nicht vorhanden |
Produkt „Homemade“ (Hausgemacht) ist nicht vorhanden. |
|
Beginnt mit |
PK beginnt mit COMPANY |
|
Sie können bis zu 5 Ereignisfilterungsmuster für eine Lambda-Funktion angeben. Beachten Sie, dass jedes dieser 5 Ereignisse als logisches ODER ausgewertet wird. Wenn Sie also zwei Filter namens Filter_One
und Filter_Two
konfigurieren, führt die Lambda-Funktion Filter_One
ODER Filter_Two
aus.
Anmerkung
Auf der Seite Lambda-Ereignisfilterung sind einige Optionen zum Filtern und Vergleichen numerischer Werte vorhanden. Im Falle von DynamoDB-Filterereignissen gelten diese jedoch nicht, da Zahlen in DynamoDB als Zeichenfolgen gespeichert werden. Bei "quantity": { "N": "50"
}
beispielsweise wissen wir aufgrund der Eigenschaft "N"
, dass es sich um eine Zahl handelt.
Zusammenführung – AWS CloudFormation
Um die Funktionalität der Ereignisfilterung in der Praxis zu veranschaulichen, finden Sie hier eine CloudFormation Beispielvorlage. Diese Vorlage generiert eine einfache DynamoDB-Tabelle mit einem Partitionsschlüssel PK und einem Sortierschlüssel SK mit aktiviertem Amazon DynamoDB Streams. Sie erstellt eine Lambda-Funktion und eine einfache Lambda-Ausführungsrolle, die das Schreiben von Protokollen in Amazon Cloudwatch und das Lesen der Ereignisse aus dem Amazon-DynamoDB-Stream ermöglichen. Zudem fügt sie die Ereignisquellenzuordnung zwischen den DynamoDB-Streams und der Lambda-Funktion hinzu, sodass die Funktion bei jedem Ereignis im Amazon-DymamoDB-Stream ausgeführt werden kann.
AWSTemplateFormatVersion: "2010-09-09" Description: Sample application that presents AWS Lambda event source filtering with Amazon DynamoDB Streams. Resources: StreamsSampleDDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: "PK" AttributeType: "S" - AttributeName: "SK" AttributeType: "S" KeySchema: - AttributeName: "PK" KeyType: "HASH" - AttributeName: "SK" KeyType: "RANGE" StreamSpecification: StreamViewType: "NEW_AND_OLD_IMAGES" ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 LambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: root PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: arn:aws:logs:*:*:* - Effect: Allow Action: - dynamodb:DescribeStream - dynamodb:GetRecords - dynamodb:GetShardIterator - dynamodb:ListStreams Resource: !GetAtt StreamsSampleDDBTable.StreamArn EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST ProcessEventLambda: Type: AWS::Lambda::Function Properties: Runtime: python3.7 Timeout: 300 Handler: index.handler Role: !GetAtt LambdaExecutionRole.Arn Code: ZipFile: | import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec) Outputs: StreamsSampleDDBTable: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.Arn StreamARN: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.StreamArn
Nachdem Sie diese Cloud-Formation-Vorlage bereitgestellt haben, können Sie das folgende Amazon-DynamoDB-Element einfügen:
{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }
Dank der einfachen Lambda-Funktion, die direkt in dieser Cloud-Formation-Vorlage enthalten ist, sehen Sie die Ereignisse in den CloudWatch Amazon-Protokollgruppen für die Lambda-Funktion wie folgt:
{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }
Filterbeispiele
-
Nur Produkte, die einem bestimmten Bundesstaat entsprechen
In diesem Beispiel wird die CloudFormation Vorlage dahingehend geändert, dass sie einen Filter enthält, der allen Produkten aus Florida mit der Abkürzung „FL“ entspricht.
EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST
Wenn Sie den Stack erneut bereitstellen, können Sie das folgende DynamoDB-Element zur Tabelle hinzufügen. Beachten Sie, dass es nicht in den Lambda-Funktionsprotokollen angezeigt wird, da das Produkt in diesem Beispiel aus Kalifornien stammt.
{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK#1000", "company_id": "1000", "fabric": "Florida Chocolates", "price": 15, "product_id": "1000", "quantity": 50, "state": "CA", "stores": 5, "type": "" }
-
Nur die Elemente, die mit einigen Werten im PK und SK beginnen
In diesem Beispiel wird die CloudFormation Vorlage so geändert, dass sie die folgende Bedingung enthält:
EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST
Beachten Sie, dass die AND Bedingung erfordert, dass sich die Bedingung innerhalb des Musters befindet, wobei sich die Schlüssel PK und SK in demselben Ausdruck befinden, getrennt durch Komma.
Entweder mit einigen Werten für PK und SK beginnen oder stammt aus bestimmten Bundesstaat.
In diesem Beispiel wird die CloudFormation Vorlage so geändert, dass sie die folgenden Bedingungen enthält:
EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST
Beachten Sie, dass die OR-Bedingung durch Einführung neuer Muster in den Filterabschnitt hinzugefügt wird.
Zusammenführung – CDK
In der folgenden Beispielvorlage zur CDK Projektgründung werden die Funktionen zur Ereignisfilterung beschrieben. Bevor Sie mit diesem CDK Projekt arbeiten können, müssen Sie die Voraussetzungen, einschließlich der Ausführung von Vorbereitungsskripten, installieren.
Erstellen Sie ein Projekt CDK
Erstellen Sie zunächst ein neues AWS CDK Projekt, indem Sie es cdk init
in einem leeren Verzeichnis aufrufen.
mkdir ddb_filters cd ddb_filters cdk init app --language python
Im Befehl cdk init
wird der Name des Projektordners zur Benennung verschiedener Elemente des Projekts verwendet, einschließlich Klassen, Unterordnern und Dateien. Bindestriche im Ordnernamen werden in Unterstriche umgewandelt. Ansonsten sollte der Name dem Format eines Python-Bezeichners folgen. Er sollte beispielsweise nicht mit einer Zahl beginnen und keine Leerzeichen enthalten.
Um mit dem neuen Projekt zu arbeiten, aktivieren Sie seine virtuelle Umgebung. Dadurch können die Abhängigkeiten des Projekts lokal im Projektordner installiert werden und müssen nicht global installiert werden.
source .venv/bin/activate python -m pip install -r requirements.txt
Anmerkung
Sie kennen diesen Befehl vielleicht als Mac-/Linux-Befehl zum Aktivieren einer virtuellen Umgebung. Die Python-Vorlagen enthalten eine Batch-Datei, source.bat
, die die Verwendung desselben Befehls unter Windows ermöglicht. Der traditionelle Windows-Befehl .venv\Scripts\activate.bat
funktioniert ebenfalls. Wenn Sie Ihr AWS CDK Projekt mit AWS CDK Toolkit v1.70.0 oder früher initialisiert haben, befindet sich Ihre virtuelle Umgebung stattdessen im Verzeichnis. .env
.venv
Grundlegende Infrastruktur
Öffnen Sie die Datei ./ddb_filters/ddb_filters_stack.py
in einem Texteditor Ihrer Wahl. Diese Datei wurde auto generiert, als Sie das AWS CDK Projekt erstellt haben.
Fügen Sie als Nächstes die Funktionen _create_ddb_table
und _set_ddb_trigger_function
hinzu. Diese Funktionen erstellen eine DynamoDB-Tabelle mit dem Partitionsschlüssel PK und dem Sortierschlüssel SK im Bereitstellungsmodus/On-Demand-Modus, wobei Amazon DynamoDB Streams standardmäßig aktiviert ist, um neue und alte Bilder anzuzeigen.
Die Lambda-Funktion wird im Ordner lambda
unter der Datei app.py
gespeichert. Diese Datei wird später erstellt. Sie wird eine Umgebungsvariable, APP_TABLE_NAME
, enthalten. Hierbei wird es sich um den Namen der Amazon-DynamoDB-Tabelle handeln, die von diesem Stack erstellt wurde. In derselben Funktion werden wir der Lambda-Funktion Stream-Leseberechtigungen erteilen. Schließlich wird sie die DynamoDB Streams als Ereignisquelle für die Lambda-Funktion abonnieren.
Am Ende der Datei, in der __init__
-Methode, werden Sie die entsprechenden Konstrukte aufrufen, um sie im Stack zu initialisieren. Bei größeren Projekten, die zusätzliche Komponenten und Services erfordern, ist es möglicherweise am besten, diese Konstrukte außerhalb des Basis-Stacks zu definieren.
import os import json import aws_cdk as cdk from aws_cdk import ( Stack, aws_lambda as _lambda, aws_dynamodb as dynamodb, ) from constructs import Construct class DdbFiltersStack(Stack): def _create_ddb_table(self): dynamodb_table = dynamodb.Table( self, "AppTable", partition_key=dynamodb.Attribute( name="PK", type=dynamodb.AttributeType.STRING ), sort_key=dynamodb.Attribute( name="SK", type=dynamodb.AttributeType.STRING), billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST, stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, removal_policy=cdk.RemovalPolicy.DESTROY, ) cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name) return dynamodb_table def _set_ddb_trigger_function(self, ddb_table): events_lambda = _lambda.Function( self, "LambdaHandler", runtime=_lambda.Runtime.PYTHON_3_9, code=_lambda.Code.from_asset("lambda"), handler="app.handler", environment={ "APP_TABLE_NAME": ddb_table.table_name, }, ) ddb_table.grant_stream_read(events_lambda) event_subscription = _lambda.CfnEventSourceMapping( scope=self, id="companyInsertsOnlyEventSourceMapping", function_name=events_lambda.function_name, event_source_arn=ddb_table.table_stream_arn, maximum_batching_window_in_seconds=1, starting_position="LATEST", batch_size=1, ) def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: super().__init__(scope, construct_id, **kwargs) ddb_table = self._create_ddb_table() self._set_ddb_trigger_function(ddb_table)
Jetzt werden wir eine sehr einfache Lambda-Funktion erstellen, die die Protokolle in Amazon CloudWatch druckt. Erstellen Sie zu diesem Zweck einen neuen Ordner namens lambda
.
mkdir lambda touch app.py
Fügen Sie der Datei app.py
über Ihren bevorzugten Texteditor den folgenden Inhalt hinzu:
import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec)
Stellen Sie sicher, dass Sie sich im Ordner /ddb_filters/
befinden, und geben Sie zum Erstellen der Beispielanwendung den folgenden Befehl ein:
cdk deploy
Irgendwann werden Sie aufgefordert, zu bestätigen, dass Sie die Lösung bereitstellen möchten. Akzeptieren Sie die Änderungen durch Eingabe von Y
.
├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤ │ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │ └───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘ Do you wish to deploy these changes (y/n)? y ... ✨ Deployment time: 67.73s Outputs: DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP Stack ARN: arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6
Sobald die Änderungen bereitgestellt sind, öffnen Sie Ihre AWS Konsole und fügen Sie Ihrer Tabelle ein Element hinzu.
{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }
Die CloudWatch Protokolle sollten jetzt alle Informationen aus diesem Eintrag enthalten.
Filterbeispiele
-
Nur Produkte, die einem bestimmten Bundesstaat entsprechen
Öffnen Sie die Datei ddb_filters/ddb_filters/ddb_filters_stack.py
und ändern Sie sie so, dass sie den Filter enthält, der alle Produkte abgleicht, die gleich „FL“ sind. Dies kann direkt unter der event_subscription
in Zeile 45 geändert werden.
event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )
-
Nur die Elemente, die mit einigen Werten im PK und SK beginnen
Ändern Sie das Python-Skript, um die folgende Bedingung aufzunehmen:
event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, ] },
-
Entweder mit einigen Werten für PK und SK beginnen oder stammt aus bestimmten Bundesstaat.
Ändern Sie das Python-Skript, um die folgenden Bedingungen aufzunehmen:
event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )
Beachten Sie, dass die OR-Bedingung durch Hinzufügen weiterer Elemente zum Filter-Array hinzugefügt wird.
Bereinigen
Suchen Sie den Filter-Stack in der Basis Ihres Arbeitsverzeichnisses und führen Sie cdk destroy
aus. Sie werden aufgefordert, das Löschen der Ressource zu bestätigen:
cdk destroy Are you sure you want to delete: DdbFiltersStack (y/n)? y