教學課程 #2:使用篩選條件處理 DynamoDB 和 Lambda 的某些事件 - Amazon DynamoDB

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

教學課程 #2:使用篩選條件處理 DynamoDB 和 Lambda 的某些事件

在本教學課程中,您將建立 AWS Lambda 觸發程序,以僅處理來自 DynamoDB 資料表之串流中的某些事件。

透過 Lambda 事件篩選,您可以使用篩選條件表達式來控制 Lambda 要傳送哪些事件給函數進行處理。對於每個 DynamoDB 串流,最多可以設定 5 個不同的篩選條件。如果您使用批次處理間隔,Lambda 會將篩選條件標準套用至每個新事件,以查看是否應將其納入目前的批次中。

篩選條件會透過稱為 FilterCriteria 的結構進行套用。FilterCriteria 的 3 項主要屬性是 metadata propertiesdata propertiesfilter patterns

以下是 DynamoDB Streams 事件的範例結構:

{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }

metadata properties 是事件物件的欄位。若是 DynamoDB Streams,metadata properties 是類似 dynamodbeventName 的欄位。

data properties 是事件主體的欄位。若要篩選 data properties,請務必將資料屬性包含在適當金鑰內的 FilterCriteria 之中。對於 DynamoDB 事件來源,資料金鑰為 NewImageOldImage

最後,篩選條件規則將會定義您要套用至特定屬性的篩選條件表達式。以下是一些範例:

比較運算子 範例 規則語法 (部分)

Null

產品類型為 Null

{ "product_type": { "S": null } }

空白

產品名稱為空

{ "product_name": { "S": [ ""] } }

等於

州等於佛羅里達州

{ "state": { "S": ["FL"] } }

產品原產州等於佛羅里達州,且產品類別為巧克力

{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } }

產品原產州是佛羅里達州或加州

{ "state": { "S": ["FL","CA"] } }

Not

產品原產州不是佛羅里達州

{"state": {"S": [{"anything-but": ["FL"]}]}}

存在

自製產品存在

{"homemade": {"S": [{"exists": true}]}}

不存在

自製產品不存在

{"homemade": {"S": [{"exists": false}]}}

開頭為

PK 開頭為 COMPANY

{"PK": {"S": [{"prefix": "COMPANY"}]}}

您可以為一個 Lambda 函數最多指定 5 個事件篩選模式。請注意,這 5 個事件中的每一個都將評估為邏輯 OR。因此,如果您設定兩個分別名為 Filter_OneFilter_Two 的篩選條件,Lambda 函數將會執行 Filter_One OR Filter_Two

注意

Lambda 事件篩選頁面中有部分選項可用於篩選和比較數值,但不適用於 DynamoDB 篩選條件事件,因為 DynamoDB 中的數字會儲存為字串。例如 "quantity": { "N": "50" },從 "N" 屬性可以得知這是一個數字。

整合練習 - AWS CloudFormation

若要在實務中顯示事件篩選功能,以下是範例 CloudFormation範本。此範本將產生一個簡單的 DynamoDB 資料表,其中含有分割區索引鍵 PK 和排序索引鍵 SK,且已啟用 Amazon DynamoDB Streams。它會建立一個 Lambda 函數和一個簡單的 Lambda 執行角色,以允許將日誌寫入 Amazon CloudWatch,並從 Amazon DynamoDB Streams 讀取事件。它也會在 DynamoDB Streams 和 Lambda 函數之間新增事件來源映射,以便每次在 Amazon DynamoDB Streams 中發生事件時,都可以執行該函數。

AWSTemplateFormatVersion: "2010-09-09" Description: Sample application that presents AWS Lambda event source filtering with Amazon DynamoDB Streams. Resources: StreamsSampleDDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: "PK" AttributeType: "S" - AttributeName: "SK" AttributeType: "S" KeySchema: - AttributeName: "PK" KeyType: "HASH" - AttributeName: "SK" KeyType: "RANGE" StreamSpecification: StreamViewType: "NEW_AND_OLD_IMAGES" ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 LambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: root PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: arn:aws:logs:*:*:* - Effect: Allow Action: - dynamodb:DescribeStream - dynamodb:GetRecords - dynamodb:GetShardIterator - dynamodb:ListStreams Resource: !GetAtt StreamsSampleDDBTable.StreamArn EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST ProcessEventLambda: Type: AWS::Lambda::Function Properties: Runtime: python3.7 Timeout: 300 Handler: index.handler Role: !GetAtt LambdaExecutionRole.Arn Code: ZipFile: | import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec) Outputs: StreamsSampleDDBTable: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.Arn StreamARN: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.StreamArn

部署此雲端編組範本之後,即可插入下列 Amazon DynamoDB 項目:

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }

由於此雲端形成範本中包含內嵌的簡單 lambda 函數,您會在 Amazon CloudWatch 日誌群組中看到 lambda 函數的事件,如下所示:

{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }

篩選條件範例

  • 僅限符合指定州的產品

此範例會修改 CloudFormation 範本以包含篩選條件,以符合來自佛羅里達州的所有產品,以及縮寫為「FL」。

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

重新部署堆疊後,即可將下列 DynamoDB 項目新增至資料表。請注意,它不會出現在 Lambda 函數日誌中,因為此範例中的產品來自加州。

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK#1000", "company_id": "1000", "fabric": "Florida Chocolates", "price": 15, "product_id": "1000", "quantity": 50, "state": "CA", "stores": 5, "type": "" }
  • 僅限以 PK 和 SK 中部分值開頭的項目

此範例會修改 CloudFormation 範本,以包含下列條件:

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

請注意,AND條件要求條件位於模式內,其中金鑰 PK 和 SK 位於以逗號分隔的相同運算式中。

以 PK 和 SK 中的部分值開頭或來自特定州。

此範例會修改 CloudFormation 範本,以包含下列條件:

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

請注意,新增 OR 條件的方法是在篩選條件區段中引入新的模式。

整合練習 - CDK

下列範例CDK專案形成範本會逐步解說事件篩選功能。使用此CDK專案之前,您需要安裝先決條件,包括執行準備指令碼

建立CDK專案

首先,cdk init在空目錄中叫用 建立新 AWS CDK 專案。

mkdir ddb_filters cd ddb_filters cdk init app --language python

cdk init 命令使用專案資料夾的名稱來命名專案的各種元素,包括類別、子資料夾和檔案。資料夾名稱中的任何連字號都會轉換為底線。否則,該名稱應遵循 Python 識別符的形式。例如,不應以數字開頭或包含空格。

若要使用新專案,請啟用其虛擬環境。如此可將專案的相依性安裝在本機專案資料夾中,而不是全域安裝。

source .venv/bin/activate python -m pip install -r requirements.txt
注意

您可以將其識別為用來啟用虛擬環境的 Mac/Linux 命令。Python 範本包含一個批次檔案 source.bat,允許在 Windows 上使用相同的命令。傳統的 Windows 命令 .venv\Scripts\activate.bat 也同樣適用。如果您使用 AWS CDK Toolkit v1.70.0 或更早版本初始化 AWS CDK 專案,您的虛擬環境位於 .env 目錄中,而不是 .venv

基本基礎架構

在您偏好的文字編輯器中開啟檔案 ./ddb_filters/ddb_filters_stack.py。當您建立 AWS CDK 專案時,系統會自動產生此檔案。

接下來,新增函數 _create_ddb_table_set_ddb_trigger_function。這些函數將會建立佈建模式或隨需模式的 DynamoDB 資料表,其中含有分割區索引鍵 PK 和排序索引鍵 SK,且依預設會啟用 Amazon DynamoDB Streams 以顯示新舊映像。

Lambda 函數會儲存在資料夾 lambda 內的檔案 app.py 中。此檔案會在稍後建立。它會包含環境變數 APP_TABLE_NAME (將成為此堆疊所建立 Amazon DynamoDB 資料表的名稱)。在相同的函數中,我們將對 Lambda 函數授予串流讀取權限。最後,它會訂閱 DynamoDB Streams 做為 Lambda 函數的事件來源。

__init__ 方法中的檔案尾端,您將會呼叫相應的建構以在堆疊中對其初始化。對於需要其他元件和服務的較大專案,最好在基本堆疊之外定義這些建構。

import os import json import aws_cdk as cdk from aws_cdk import ( Stack, aws_lambda as _lambda, aws_dynamodb as dynamodb, ) from constructs import Construct class DdbFiltersStack(Stack): def _create_ddb_table(self): dynamodb_table = dynamodb.Table( self, "AppTable", partition_key=dynamodb.Attribute( name="PK", type=dynamodb.AttributeType.STRING ), sort_key=dynamodb.Attribute( name="SK", type=dynamodb.AttributeType.STRING), billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST, stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, removal_policy=cdk.RemovalPolicy.DESTROY, ) cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name) return dynamodb_table def _set_ddb_trigger_function(self, ddb_table): events_lambda = _lambda.Function( self, "LambdaHandler", runtime=_lambda.Runtime.PYTHON_3_9, code=_lambda.Code.from_asset("lambda"), handler="app.handler", environment={ "APP_TABLE_NAME": ddb_table.table_name, }, ) ddb_table.grant_stream_read(events_lambda) event_subscription = _lambda.CfnEventSourceMapping( scope=self, id="companyInsertsOnlyEventSourceMapping", function_name=events_lambda.function_name, event_source_arn=ddb_table.table_stream_arn, maximum_batching_window_in_seconds=1, starting_position="LATEST", batch_size=1, ) def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: super().__init__(scope, construct_id, **kwargs) ddb_table = self._create_ddb_table() self._set_ddb_trigger_function(ddb_table)

現在,我們將建立非常簡單的 lambda 函數,將日誌列印到 Amazon CloudWatch。若要執行此操作,請建立名為 lambda 的新資料夾。

mkdir lambda touch app.py

使用您偏好的文字編輯器,將下列內容新增至 app.py 檔案:

import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec)

確定您位於 /ddb_filters/ 資料夾中,並輸入下列命令以建立範例應用程式:

cdk deploy

在特定時間點,系統會要求您確認是否要部署解決方案。輸入 Y 以接受變更。

├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤ │ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │ └───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘ Do you wish to deploy these changes (y/n)? y ... ✨ Deployment time: 67.73s Outputs: DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP Stack ARN: arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6

部署變更後,請開啟 AWS 主控台並將一個項目新增至資料表。

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }

CloudWatch 日誌現在應包含此項目的所有資訊。

篩選條件範例

  • 僅限符合指定州的產品

開啟檔案 ddb_filters/ddb_filters/ddb_filters_stack.py 加以修改,使其納入符合所有等於 "FL" 之產品的篩選條件。這可以在第 45 行中 event_subscription 的正下方進行修改。

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )
  • 僅限以 PK 和 SK 中部分值開頭的項目

修改 Python 指令碼以納入下列條件:

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, ] },
  • 應以位於 PK 和 SK 中的部分值開頭或來自特定州。

修改 python 指令碼以納入下列條件:

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )

請注意,新增 OR 條件的方法是在篩選條件陣列中新增更多元素。

清除

請在工作目錄的底部找到篩選條件堆疊,然後執行 cdk destroy。系統會要求您確認刪除此資源:

cdk destroy Are you sure you want to delete: DdbFiltersStack (y/n)? y