本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
教學課程 #2:使用篩選條件處理 DynamoDB 和 Lambda 的某些事件
在本教學課程中,您將建立 AWS Lambda 觸發程序,以僅處理來自 DynamoDB 資料表之串流中的某些事件。
透過 Lambda 事件篩選,您可以使用篩選條件表達式來控制 Lambda 要傳送哪些事件給函數進行處理。對於每個 DynamoDB 串流,最多可以設定 5 個不同的篩選條件。如果您使用批次處理間隔,Lambda 會將篩選條件標準套用至每個新事件,以查看是否應將其納入目前的批次中。
篩選條件會透過稱為 FilterCriteria
的結構進行套用。FilterCriteria
的 3 項主要屬性是 metadata properties
、data properties
和 filter patterns
。
以下是 DynamoDB Streams 事件的範例結構:
{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }
metadata properties
是事件物件的欄位。若是 DynamoDB Streams,metadata properties
是類似 dynamodb
或 eventName
的欄位。
data properties
是事件主體的欄位。若要篩選 data properties
,請務必將資料屬性包含在適當金鑰內的 FilterCriteria
之中。對於 DynamoDB 事件來源,資料金鑰為 NewImage
或 OldImage
。
最後,篩選條件規則將會定義您要套用至特定屬性的篩選條件表達式。以下是一些範例:
比較運算子 | 範例 | 規則語法 (部分) |
---|---|---|
Null |
產品類型為 Null |
|
空白 |
產品名稱為空 |
|
等於 |
州等於佛羅里達州 |
|
及 |
產品原產州等於佛羅里達州,且產品類別為巧克力 |
|
或 |
產品原產州是佛羅里達州或加州 |
|
Not |
產品原產州不是佛羅里達州 |
|
存在 |
自製產品存在 |
|
不存在 |
自製產品不存在 |
|
開頭為 |
PK 開頭為 COMPANY |
|
您可以為一個 Lambda 函數最多指定 5 個事件篩選模式。請注意,這 5 個事件中的每一個都將評估為邏輯 OR。因此,如果您設定兩個分別名為 Filter_One
和 Filter_Two
的篩選條件,Lambda 函數將會執行 Filter_One
OR Filter_Two
。
注意
Lambda 事件篩選頁面中有部分選項可用於篩選和比較數值,但不適用於 DynamoDB 篩選條件事件,因為 DynamoDB 中的數字會儲存為字串。例如 "quantity": { "N": "50"
}
,從 "N"
屬性可以得知這是一個數字。
整合練習 - AWS CloudFormation
若要在實務中顯示事件篩選功能,以下是範例 CloudFormation範本。此範本將產生一個簡單的 DynamoDB 資料表,其中含有分割區索引鍵 PK 和排序索引鍵 SK,且已啟用 Amazon DynamoDB Streams。它會建立一個 Lambda 函數和一個簡單的 Lambda 執行角色,以允許將日誌寫入 Amazon CloudWatch,並從 Amazon DynamoDB Streams 讀取事件。它也會在 DynamoDB Streams 和 Lambda 函數之間新增事件來源映射,以便每次在 Amazon DynamoDB Streams 中發生事件時,都可以執行該函數。
AWSTemplateFormatVersion: "2010-09-09" Description: Sample application that presents AWS Lambda event source filtering with Amazon DynamoDB Streams. Resources: StreamsSampleDDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: "PK" AttributeType: "S" - AttributeName: "SK" AttributeType: "S" KeySchema: - AttributeName: "PK" KeyType: "HASH" - AttributeName: "SK" KeyType: "RANGE" StreamSpecification: StreamViewType: "NEW_AND_OLD_IMAGES" ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 LambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: root PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: arn:aws:logs:*:*:* - Effect: Allow Action: - dynamodb:DescribeStream - dynamodb:GetRecords - dynamodb:GetShardIterator - dynamodb:ListStreams Resource: !GetAtt StreamsSampleDDBTable.StreamArn EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST ProcessEventLambda: Type: AWS::Lambda::Function Properties: Runtime: python3.7 Timeout: 300 Handler: index.handler Role: !GetAtt LambdaExecutionRole.Arn Code: ZipFile: | import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec) Outputs: StreamsSampleDDBTable: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.Arn StreamARN: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.StreamArn
部署此雲端編組範本之後,即可插入下列 Amazon DynamoDB 項目:
{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }
由於此雲端形成範本中包含內嵌的簡單 lambda 函數,您會在 Amazon CloudWatch 日誌群組中看到 lambda 函數的事件,如下所示:
{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }
篩選條件範例
-
僅限符合指定州的產品
此範例會修改 CloudFormation 範本以包含篩選條件,以符合來自佛羅里達州的所有產品,以及縮寫為「FL」。
EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST
重新部署堆疊後,即可將下列 DynamoDB 項目新增至資料表。請注意,它不會出現在 Lambda 函數日誌中,因為此範例中的產品來自加州。
{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK#1000", "company_id": "1000", "fabric": "Florida Chocolates", "price": 15, "product_id": "1000", "quantity": 50, "state": "CA", "stores": 5, "type": "" }
-
僅限以 PK 和 SK 中部分值開頭的項目
此範例會修改 CloudFormation 範本,以包含下列條件:
EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST
請注意,AND條件要求條件位於模式內,其中金鑰 PK 和 SK 位於以逗號分隔的相同運算式中。
以 PK 和 SK 中的部分值開頭或來自特定州。
此範例會修改 CloudFormation 範本,以包含下列條件:
EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST
請注意,新增 OR 條件的方法是在篩選條件區段中引入新的模式。
整合練習 - CDK
下列範例CDK專案形成範本會逐步解說事件篩選功能。使用此CDK專案之前,您需要安裝先決條件,包括執行準備指令碼 。
建立CDK專案
首先,cdk init
在空目錄中叫用 建立新 AWS CDK 專案。
mkdir ddb_filters cd ddb_filters cdk init app --language python
cdk init
命令使用專案資料夾的名稱來命名專案的各種元素,包括類別、子資料夾和檔案。資料夾名稱中的任何連字號都會轉換為底線。否則,該名稱應遵循 Python 識別符的形式。例如,不應以數字開頭或包含空格。
若要使用新專案,請啟用其虛擬環境。如此可將專案的相依性安裝在本機專案資料夾中,而不是全域安裝。
source .venv/bin/activate python -m pip install -r requirements.txt
注意
您可以將其識別為用來啟用虛擬環境的 Mac/Linux 命令。Python 範本包含一個批次檔案 source.bat
,允許在 Windows 上使用相同的命令。傳統的 Windows 命令 .venv\Scripts\activate.bat
也同樣適用。如果您使用 AWS CDK Toolkit v1.70.0 或更早版本初始化 AWS CDK 專案,您的虛擬環境位於 .env
目錄中,而不是 .venv
。
基本基礎架構
在您偏好的文字編輯器中開啟檔案 ./ddb_filters/ddb_filters_stack.py
。當您建立 AWS CDK 專案時,系統會自動產生此檔案。
接下來,新增函數 _create_ddb_table
和 _set_ddb_trigger_function
。這些函數將會建立佈建模式或隨需模式的 DynamoDB 資料表,其中含有分割區索引鍵 PK 和排序索引鍵 SK,且依預設會啟用 Amazon DynamoDB Streams 以顯示新舊映像。
Lambda 函數會儲存在資料夾 lambda
內的檔案 app.py
中。此檔案會在稍後建立。它會包含環境變數 APP_TABLE_NAME
(將成為此堆疊所建立 Amazon DynamoDB 資料表的名稱)。在相同的函數中,我們將對 Lambda 函數授予串流讀取權限。最後,它會訂閱 DynamoDB Streams 做為 Lambda 函數的事件來源。
在 __init__
方法中的檔案尾端,您將會呼叫相應的建構以在堆疊中對其初始化。對於需要其他元件和服務的較大專案,最好在基本堆疊之外定義這些建構。
import os import json import aws_cdk as cdk from aws_cdk import ( Stack, aws_lambda as _lambda, aws_dynamodb as dynamodb, ) from constructs import Construct class DdbFiltersStack(Stack): def _create_ddb_table(self): dynamodb_table = dynamodb.Table( self, "AppTable", partition_key=dynamodb.Attribute( name="PK", type=dynamodb.AttributeType.STRING ), sort_key=dynamodb.Attribute( name="SK", type=dynamodb.AttributeType.STRING), billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST, stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, removal_policy=cdk.RemovalPolicy.DESTROY, ) cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name) return dynamodb_table def _set_ddb_trigger_function(self, ddb_table): events_lambda = _lambda.Function( self, "LambdaHandler", runtime=_lambda.Runtime.PYTHON_3_9, code=_lambda.Code.from_asset("lambda"), handler="app.handler", environment={ "APP_TABLE_NAME": ddb_table.table_name, }, ) ddb_table.grant_stream_read(events_lambda) event_subscription = _lambda.CfnEventSourceMapping( scope=self, id="companyInsertsOnlyEventSourceMapping", function_name=events_lambda.function_name, event_source_arn=ddb_table.table_stream_arn, maximum_batching_window_in_seconds=1, starting_position="LATEST", batch_size=1, ) def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: super().__init__(scope, construct_id, **kwargs) ddb_table = self._create_ddb_table() self._set_ddb_trigger_function(ddb_table)
現在,我們將建立非常簡單的 lambda 函數,將日誌列印到 Amazon CloudWatch。若要執行此操作,請建立名為 lambda
的新資料夾。
mkdir lambda touch app.py
使用您偏好的文字編輯器,將下列內容新增至 app.py
檔案:
import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec)
確定您位於 /ddb_filters/
資料夾中,並輸入下列命令以建立範例應用程式:
cdk deploy
在特定時間點,系統會要求您確認是否要部署解決方案。輸入 Y
以接受變更。
├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤ │ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │ └───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘ Do you wish to deploy these changes (y/n)? y ... ✨ Deployment time: 67.73s Outputs: DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP Stack ARN: arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6
部署變更後,請開啟 AWS 主控台並將一個項目新增至資料表。
{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }
CloudWatch 日誌現在應包含此項目的所有資訊。
篩選條件範例
-
僅限符合指定州的產品
開啟檔案 ddb_filters/ddb_filters/ddb_filters_stack.py
加以修改,使其納入符合所有等於 "FL" 之產品的篩選條件。這可以在第 45 行中 event_subscription
的正下方進行修改。
event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )
-
僅限以 PK 和 SK 中部分值開頭的項目
修改 Python 指令碼以納入下列條件:
event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, ] },
-
應以位於 PK 和 SK 中的部分值開頭或來自特定州。
修改 python 指令碼以納入下列條件:
event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )
請注意,新增 OR 條件的方法是在篩選條件陣列中新增更多元素。
清除
請在工作目錄的底部找到篩選條件堆疊,然後執行 cdk destroy
。系統會要求您確認刪除此資源:
cdk destroy Are you sure you want to delete: DdbFiltersStack (y/n)? y