チュートリアル #2: DynamoDB と Lambda での、フィルターを使用したいくつかのイベントの処理。
このチュートリアルでは、AWS Lambda トリガーを作成して、DynamoDB テーブルからのストリームの一部のイベントのみを処理します。
Lambda イベントフィルタリングでは、フィルター式を使用して、処理のために Lambda が関数に送信するイベントを制御できます。DynamoDB ストリームごとに最大 5 つの異なるフィルターを設定できます。バッチ処理ウィンドウを使用している場合、Lambda は新しいイベントそれぞれにフィルター条件を適用して、現在のバッチに含めるかどうかを確認します。
フィルターは FilterCriteria
と呼ばれる構造を介して適用されます。FilterCriteria
の 3 つの主な属性は metadata properties
、data properties
、および filter patterns
です。
DynamoDB Streams イベントの構造の例を次に示します。
{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }
metadata properties
は、イベントオブジェクトのフィールドです。DynamoDB Streams の場合、metadata properties
は dynamodb
や eventName
のようなフィールドです。
data properties
は、イベント本文のフィールドです。data properties
をフィルタリングするには、それらを適切なキー内の FilterCriteria
に含めるようにしてください。DynamoDB イベントソースのデータキーは NewImage
または OldImage
です。
最後に、フィルタールールは、特定のプロパティに適用するフィルター式を定義します。次に例を示します。
Comparison operator (比較演算子) | 例 | ルール構文 (一部) |
---|---|---|
Null |
製品タイプは NULL |
|
空 |
製品名は空白 |
|
等しい |
州はフロリダに等しい |
|
And |
製品州はフロリダ、製品カテゴリはチョコレート |
|
または |
製品州はフロリダまたはカリフォルニア |
|
Not |
製品州はフロリダ州ではない |
|
存在する |
地産品は存在する |
|
存在しない |
地産品は存在しない |
|
で始まる |
PK は COMPANY から始まる |
|
Lambda 関数には、最大 5 つのイベントフィルタリングパターンを指定できます。これら 5 つのイベントのそれぞれが論理 OR として評価されることに注意してください。そのため、Filter_One
および Filter_Two
という名前の 2 つのフィルターを設定すると、Lambda 関数は Filter_One
OR Filter_Two
を実行します。
注記
Lambda イベントのフィルタリングページには、数値をフィルタリングして比較するオプションがいくつかありますが、DynamoDB のフィルタイベントの場合、DynamoDB の数値は文字列として保存されるため、適用されません。例えば "quantity": { "N": "50"
}
の場合は、"N"
プロパティのおかげでそれが数字だとわかります。
すべてをまとめる - AWS CloudFormation
実際のイベントフィルタリング機能を見ていただくために、CloudFormation テンプレートのサンプルを以下に示します。このテンプレートは、Amazon DynamoDB Streams を有効にしたパーティションキー PK とソートキー SK を含むシンプルな DynamoDB テーブルを生成します。これにより、Amazon Cloudwatch へのログの書き込みと Amazon DynamoDB Streams からのイベントの読み取りを許可する Lambda 関数とシンプルな Lambda 実行ロールが作成されます。また、DynamoDB Streams と Lambda 関数間にイベントソースマッピングも追加されるため、Amazon DynamoDB Streams にイベントが発生するたびに関数を実行できます。
AWSTemplateFormatVersion: "2010-09-09" Description: Sample application that presents AWS Lambda event source filtering with Amazon DynamoDB Streams. Resources: StreamsSampleDDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: "PK" AttributeType: "S" - AttributeName: "SK" AttributeType: "S" KeySchema: - AttributeName: "PK" KeyType: "HASH" - AttributeName: "SK" KeyType: "RANGE" StreamSpecification: StreamViewType: "NEW_AND_OLD_IMAGES" ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 LambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: root PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: arn:aws:logs:*:*:* - Effect: Allow Action: - dynamodb:DescribeStream - dynamodb:GetRecords - dynamodb:GetShardIterator - dynamodb:ListStreams Resource: !GetAtt StreamsSampleDDBTable.StreamArn EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST ProcessEventLambda: Type: AWS::Lambda::Function Properties: Runtime: python3.7 Timeout: 300 Handler: index.handler Role: !GetAtt LambdaExecutionRole.Arn Code: ZipFile: | import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec) Outputs: StreamsSampleDDBTable: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.Arn StreamARN: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.StreamArn
この CloudFormation テンプレートをデプロイすると、次の Amazon DynamoDB 項目を挿入できます。
{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }
この CloudFormation テンプレートにインラインで組み込まれているシンプルな Lambda 関数により、Lambda 関数の Amazon CloudWatch ロググループのイベントは次のように表示されます。
{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }
フィルター例
-
特定の州に一致する商品のみ
この例では、CloudFormation テンプレートを変更して、フロリダで生産されたすべての製品 (略称「FL」) と一致するフィルターを含めます。
EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST
スタックを再デプロイしたら、テーブルに次の DynamoDB 項目を追加できます。この例での製品はカリフォルニア産なので、Lambda 関数ログには表示されないことに注意してください。
{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK#1000", "company_id": "1000", "fabric": "Florida Chocolates", "price": 15, "product_id": "1000", "quantity": 50, "state": "CA", "stores": 5, "type": "" }
-
PK と SK のうち、ある値で始まるアイテムのみ
この例では、CloudFormation テンプレートを変更して次の条件を含めます。
EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST
AND 条件では、条件がパターン内になければならないことに注意してください。ここで、キー PK と SK は同じ式内にあり、カンマで区切られます。
PK と SK がある値から始まるか、特定の州産かのどちらかです。
この例では、CloudFormation テンプレートを変更して次の条件を含めます。
EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST
フィルターセクションに新しいパターンを導入することで、OR 条件が追加されます。
すべてをまとめる - CDK
次のサンプル CDK プロジェクト形成テンプレートでは、イベントフィルタリング機能について説明します。この CDK プロジェクトに取り組む前に、準備スクリプトの実行を含む前提条件をインストールする必要があります。
CDK プロジェクトを作成する
まず、空のディレクトリで cdk init
を呼び出して、新しい AWS CDK プロジェクトを作成します。
mkdir ddb_filters cd ddb_filters cdk init app --language python
この cdk init
コマンドは、プロジェクトフォルダの名前を使用して、クラス、サブフォルダ、ファイルなど、プロジェクトのさまざまな要素に名前を付けます。フォルダ名に含まれるハイフンはすべてアンダースコアに変換されます。それ以外の場合、名前は Python 識別子の形式に従う必要があります。例えば、数字で始めたり、スペースを含めたりはしないでください。
新しいプロジェクトで作業するには、その仮想環境を有効にします。これにより、プロジェクトの依存関係をグローバルにインストールするのではなく、プロジェクトフォルダにローカルにインストールできます。
source .venv/bin/activate python -m pip install -r requirements.txt
注記
これは、仮想環境をアクティブにする Mac/Linux コマンドとして認識されるかもしれません。Python テンプレートには、同じコマンドを Windows で使用できるようにするバッチファイル、source.bat
が含まれています。従来の Windows コマンド、.venv\Scripts\activate.bat
も機能します。AWS CDK Toolkit v1.70.0 以前を使用して AWS CDK プロジェクトを初期化した場合、仮想環境は .venv
ではなく .env
ディレクトリにあります。
基本インフラストラクチャ
任意のテキストエディタでファイル ./ddb_filters/ddb_filters_stack.py
を開きます。このファイルは、AWS CDK プロジェクトを作成したときに自動生成されました。
次に、_create_ddb_table
および _set_ddb_trigger_function
関数を追加します。これらの関数は、プロビジョニングモードのオンデマンドモードでパーティションキー PK とソートキー SK を含む DynamoDB テーブルを作成します。Amazon DynamoDB Streams をデフォルトで有効にして、新しいイメージと古いイメージを表示できます。
Lambda 関数はファイル app.py
の下のフォルダ lambda
に保存されます。このファイルは後で作成されます。これには環境変数 APP_TABLE_NAME
が含まれます。この変数は、このスタックによって作成される Amazon DynamoDB テーブルの名前になります。同じ関数で、Lambda 関数にストリーム読み取り権限を付与します。最後に、Lambda 関数のイベントソースとして DynamoDB Streams にサブスクライブします。
__init__
メソッド内のファイルの最後で、それぞれの構成を呼び出してスタック内で初期化します。追加のコンポーネントやサービスを必要とする大規模なプロジェクトでは、これらの構成を基本スタックの外部で定義するのが最適な場合があります。
import os import json import aws_cdk as cdk from aws_cdk import ( Stack, aws_lambda as _lambda, aws_dynamodb as dynamodb, ) from constructs import Construct class DdbFiltersStack(Stack): def _create_ddb_table(self): dynamodb_table = dynamodb.Table( self, "AppTable", partition_key=dynamodb.Attribute( name="PK", type=dynamodb.AttributeType.STRING ), sort_key=dynamodb.Attribute( name="SK", type=dynamodb.AttributeType.STRING), billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST, stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, removal_policy=cdk.RemovalPolicy.DESTROY, ) cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name) return dynamodb_table def _set_ddb_trigger_function(self, ddb_table): events_lambda = _lambda.Function( self, "LambdaHandler", runtime=_lambda.Runtime.PYTHON_3_9, code=_lambda.Code.from_asset("lambda"), handler="app.handler", environment={ "APP_TABLE_NAME": ddb_table.table_name, }, ) ddb_table.grant_stream_read(events_lambda) event_subscription = _lambda.CfnEventSourceMapping( scope=self, id="companyInsertsOnlyEventSourceMapping", function_name=events_lambda.function_name, event_source_arn=ddb_table.table_stream_arn, maximum_batching_window_in_seconds=1, starting_position="LATEST", batch_size=1, ) def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: super().__init__(scope, construct_id, **kwargs) ddb_table = self._create_ddb_table() self._set_ddb_trigger_function(ddb_table)
次に、Amazon CloudWatch にログを出力する、非常にシンプルな Lambda 関数を作成します。それには、lambda
という新しいフォルダを作成します。
mkdir lambda touch app.py
任意のテキストエディタを使用して、次の内容を app.py
ファイルに追加します。
import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec)
/ddb_filters/
フォルダにいることを確認し、次のコマンドを入力してサンプルアプリケーションを作成します。
cdk deploy
ある時点で、ソリューションをデプロイするかどうか確認する画面が表示されます。Y
を入力して変更を確定します。
├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤ │ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │ └───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘ Do you wish to deploy these changes (y/n)? y ... ✨ Deployment time: 67.73s Outputs: DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP Stack ARN: arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6
変更がデプロイされたら、AWS コンソールを開いてテーブルに項目を 1 つ追加します。
{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }
これで CloudWatch ログには、このエントリの情報がすべて含まれているはずです。
フィルター例
-
特定の州に一致する商品のみ
ファイル ddb_filters/ddb_filters/ddb_filters_stack.py
を開き、「FL」に等しいすべての製品と一致するフィルターを含めるように変更します。これは 45 行目の event_subscription
のすぐ下で修正できます。
event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )
-
PK と SK のうち、ある値で始まる項目のみ
Python スクリプトを次の条件を含むように変更します。
event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, ] },
-
PK と SK がある値から始まるか、特定の州産かのどちらかです。
Python スクリプトを次の条件を含むように変更します。
event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )
Filters 配列に要素を追加することで OR 条件が追加されることに注意してください。
クリーンアップ
作業ディレクトリのベースにあるフィルタースタックを見つけて、cdk destroy
を実行します。リソースの削除を確認するメッセージが表示されます。
cdk destroy Are you sure you want to delete: DdbFiltersStack (y/n)? y