

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 教學課程 \$12：使用篩選條件來處理 DynamoDB 和 Lambda 的所有事件
<a name="Streams.Lambda.Tutorial2"></a>

在本教學課程中，您將建立 AWS Lambda 觸發程序，以僅處理來自 DynamoDB 資料表之串流中的某些事件。

**Topics**
+ [全部整合 - CloudFormation](#Streams.Lambda.Tutorial2.Cloudformation)
+ [整合練習 - CDK](#Streams.Lambda.Tutorial2.CDK)

透過 [Lambda 事件篩選](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html)，您可以使用篩選條件表達式來控制 Lambda 要傳送哪些事件給函數進行處理。對於每個 DynamoDB 串流，最多可以設定 5 個不同的篩選條件。如果您使用批次處理間隔，Lambda 會將篩選條件標準套用至每個新事件，以查看是否應將其納入目前的批次中。

篩選條件會透過稱為 `FilterCriteria` 的結構進行套用。`FilterCriteria` 的 3 項主要屬性是 `metadata properties`、`data properties` 和 `filter patterns`。

以下是 DynamoDB Streams 事件的範例結構：

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

`metadata properties` 是事件物件的欄位。若是 DynamoDB Streams，`metadata properties` 是類似 `dynamodb` 或 `eventName` 的欄位。

`data properties` 是事件主體的欄位。若要篩選 `data properties`，請務必將資料屬性包含在適當金鑰內的 `FilterCriteria` 之中。對於 DynamoDB 事件來源，資料金鑰為 `NewImage` 或 `OldImage`。

最後，篩選條件規則將會定義您要套用至特定屬性的篩選條件表達式。以下是一些範例：


| 比較運算子 | 範例 | 規則語法 (部分) | 
| --- | --- | --- | 
|  Null  |  產品類型為 Null  |  `{ "product_type": { "S": null } } `  | 
|  空白  |  產品名稱為空  |  `{ "product_name": { "S": [ ""] } } `  | 
|  Equals  |  州等於佛羅里達州  |  `{ "state": { "S": ["FL"] } } `  | 
|  及  |  產品原產州等於佛羅里達州，且產品類別為巧克力  |  `{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } } `  | 
|  或  |  產品原產州是佛羅里達州或加州  |  `{ "state": { "S": ["FL","CA"] } } `  | 
|  Not  |  產品原產州不是佛羅里達州  |  `{"state": {"S": [{"anything-but": ["FL"]}]}}`  | 
|  存在  |  自製產品存在  |  `{"homemade": {"S": [{"exists": true}]}}`  | 
|  不存在  |  自製產品不存在  |  `{"homemade": {"S": [{"exists": false}]}}`  | 
|  Begins with  |  PK 以 COMPANY 開頭  |  `{"PK": {"S": [{"prefix": "COMPANY"}]}}`  | 

您可以為一個 Lambda 函式最多指定 5 個事件篩選模式。請注意，這 5 個事件中的每一個都將評估為邏輯 OR。因此，如果您設定兩個分別名為 `Filter_One` 和 `Filter_Two` 的篩選條件，Lambda 函式將會執行 `Filter_One` OR `Filter_Two`。

**注意**  
[Lambda 事件篩選](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html)頁面中有部分選項可用於篩選和比較數值，但不適用於 DynamoDB 篩選條件事件，因為 DynamoDB 中的數字會儲存為字串。例如 ` "quantity": { "N": "50" }`，從 `"N"` 屬性可以得知這是一個數字。

## 全部整合 - CloudFormation
<a name="Streams.Lambda.Tutorial2.Cloudformation"></a>

若要在實務中顯示事件篩選功能，請參考以下 CloudFormation 範本的範例。此範本將產生一個簡單的 DynamoDB 資料表，其中含有分割區索引鍵 PK 和排序索引鍵 SK，且已啟用 Amazon DynamoDB Streams。它會建立一個 Lambda 函式和一個簡單的 Lambda 執行角色，以允許將日誌寫入 Amazon CloudWatch，並從 Amazon DynamoDB Streams 讀取事件。它也會在 DynamoDB Streams 和 Lambda 函式之間新增事件來源映射，以便每次在 Amazon DynamoDB Streams 中發生事件時，都可以執行該函數。

```
AWSTemplateFormatVersion: "2010-09-09"

Description: Sample application that presents AWS Lambda event source filtering 
with Amazon DynamoDB Streams.

Resources:
  StreamsSampleDDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: "PK"
          AttributeType: "S"
        - AttributeName: "SK"
          AttributeType: "S"
      KeySchema:
        - AttributeName: "PK"
          KeyType: "HASH"
        - AttributeName: "SK"
          KeyType: "RANGE"
      StreamSpecification:
        StreamViewType: "NEW_AND_OLD_IMAGES"
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 5

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17",		 	 	 
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: "/"
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: "2012-10-17",		 	 	 
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: arn:aws:logs:*:*:*
              - Effect: Allow
                Action:
                  - dynamodb:DescribeStream
                  - dynamodb:GetRecords
                  - dynamodb:GetShardIterator
                  - dynamodb:ListStreams
                Resource: !GetAtt StreamsSampleDDBTable.StreamArn

  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST

  ProcessEventLambda:
    Type: AWS::Lambda::Function
    Properties:
      Runtime: python3.7
      Timeout: 300
      Handler: index.handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Code:
        ZipFile: |
          import logging

          LOGGER = logging.getLogger()
          LOGGER.setLevel(logging.INFO)

          def handler(event, context):
            LOGGER.info('Received Event: %s', event)
            for rec in event['Records']:
              LOGGER.info('Record: %s', rec)

Outputs:
  StreamsSampleDDBTable:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.Arn
  StreamARN:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.StreamArn
```

部署此雲端編組範本之後，即可插入下列 Amazon DynamoDB 項目：

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

由於此雲端編組範本內嵌簡單的 Lambda 函式，因此可以在 Amazon CloudWatch 日誌群組中查看 Lambda 函式事件，如下所示：

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

**篩選條件範例**
+ **僅限符合指定州的產品**

此範例修改了 CloudFormation 範本，使其納入符合所有來自佛羅里達州 (縮寫為 "FL") 之產品的篩選條件。

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

重新部署堆疊後，即可將下列 DynamoDB 項目新增至資料表。請注意，它不會出現在 Lambda 函式日誌中，因為此範例中的產品來自加州。

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK#1000",
 "company_id": "1000",
 "fabric": "Florida Chocolates",
 "price": 15,
 "product_id": "1000",
 "quantity": 50,
 "state": "CA",
 "stores": 5,
 "type": ""
}
```
+ **僅限以 PK 和 SK 中部分值開頭的項目**

此範例修改了 CloudFormation 範本，使其納入下列條件：

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

請注意，AND 條件要求條件位於模式內部，且其中的索引鍵 PK 和 SK 位於相同表達式內並以逗號分隔。

以 PK 和 SK 中的部分值開頭或來自特定州。

此範例修改了 CloudFormation 範本，使其納入下列條件：

```
  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

請注意，新增 OR 條件的方法是在篩選條件區段中引入新的模式。

## 整合練習 - CDK
<a name="Streams.Lambda.Tutorial2.CDK"></a>

以下範例 CDK 專案編組範本逐步解說事件篩選功能。在使用此 CDK 專案之前，必須先[安裝必要條件](https://docs.aws.amazon.com/cdk/v2/guide/work-with.html)，包括[執行準備指令碼](https://docs.aws.amazon.com/cdk/v2/guide/work-with-cdk-python.html)。

**建立 CDK 專案**

首先，`cdk init`在空目錄中叫用 來建立新的 AWS CDK 專案。

```
mkdir ddb_filters
cd ddb_filters
cdk init app --language python
```

`cdk init` 命令使用專案資料夾的名稱來命名專案的各種元素，包括類別、子資料夾和檔案。資料夾名稱中的任何連字號都會轉換為底線。否則，該名稱應遵循 Python 識別符的形式。例如，不應以數字開頭或包含空格。

若要使用新專案，請啟用其虛擬環境。如此可將專案的相依性安裝在本機專案資料夾中，而不是全域安裝。

```
source .venv/bin/activate
python -m pip install -r requirements.txt
```

**注意**  
您可以將其識別為用來啟用虛擬環境的 Mac/Linux 命令。Python 範本包含一個批次檔案 `source.bat`，允許在 Windows 上使用相同的命令。傳統的 Windows 命令 `.venv\Scripts\activate.bat` 也同樣適用。如果您使用 AWS CDK Toolkit v1.70.0 或更早版本初始化 AWS CDK 專案，您的虛擬環境位於 `.env`目錄中，而不是 `.venv`。

**基本基礎架構**

在您偏好的文字編輯器中開啟檔案 `./ddb_filters/ddb_filters_stack.py`。此檔案會在您建立 AWS CDK 專案時自動產生。

接下來，新增函數 `_create_ddb_table` 和 `_set_ddb_trigger_function`。這些函數將會建立佈建模式或隨需模式的 DynamoDB 資料表，其中含有分割區索引鍵 PK 和排序索引鍵 SK，且依預設會啟用 Amazon DynamoDB Streams 以顯示新舊映像。

Lambda 函式會儲存在資料夾 `lambda` 內的檔案 `app.py` 中。此檔案會在稍後建立。它會包含環境變數 `APP_TABLE_NAME` (將成為此堆疊所建立 Amazon DynamoDB 資料表的名稱)。在相同的函數中，我們將對 Lambda 函式授予串流讀取權限。最後，它會訂閱 DynamoDB Streams 做為 Lambda 函式的事件來源。

在 `__init__` 方法中的檔案尾端，您將會呼叫相應的建構以在堆疊中對其初始化。對於需要其他元件和服務的較大專案，最好在基本堆疊之外定義這些建構。

```
import os
import json

import aws_cdk as cdk
from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_dynamodb as dynamodb,
)
from constructs import Construct


class DdbFiltersStack(Stack):

    def _create_ddb_table(self):
        dynamodb_table = dynamodb.Table(
            self,
            "AppTable",
            partition_key=dynamodb.Attribute(
                name="PK", type=dynamodb.AttributeType.STRING
            ),
            sort_key=dynamodb.Attribute(
                name="SK", type=dynamodb.AttributeType.STRING),
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
            stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
            removal_policy=cdk.RemovalPolicy.DESTROY,
        )

        cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name)
        return dynamodb_table

    def _set_ddb_trigger_function(self, ddb_table):
        events_lambda = _lambda.Function(
            self,
            "LambdaHandler",
            runtime=_lambda.Runtime.PYTHON_3_9,
            code=_lambda.Code.from_asset("lambda"),
            handler="app.handler",
            environment={
                "APP_TABLE_NAME": ddb_table.table_name,
            },
        )

        ddb_table.grant_stream_read(events_lambda)

        event_subscription = _lambda.CfnEventSourceMapping(
            scope=self,
            id="companyInsertsOnlyEventSourceMapping",
            function_name=events_lambda.function_name,
            event_source_arn=ddb_table.table_stream_arn,
            maximum_batching_window_in_seconds=1,
            starting_position="LATEST",
            batch_size=1,
        )

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        ddb_table = self._create_ddb_table()
        self._set_ddb_trigger_function(ddb_table)
```

現在，我們將建立一個非常簡單的 Lambda 函式，以將日誌列印至 Amazon CloudWatch。若要執行此操作，請建立名為 `lambda` 的新資料夾。

```
mkdir lambda
touch app.py
```

使用您偏好的文字編輯器，將下列內容新增至 `app.py` 檔案：

```
import logging

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)


def handler(event, context):
    LOGGER.info('Received Event: %s', event)
    for rec in event['Records']:
        LOGGER.info('Record: %s', rec)
```

確定您位於 `/ddb_filters/` 資料夾中，並輸入下列命令以建立範例應用程式：

```
cdk deploy
```

在特定時間點，系統會要求您確認是否要部署解決方案。輸入 `Y` 以接受變更。

```
├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤
│ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │
└───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘

Do you wish to deploy these changes (y/n)? y

...

✨  Deployment time: 67.73s

Outputs:
DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP
Stack ARN:
arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6
```

部署變更後，請開啟 AWS 主控台，並將一個項目新增至資料表。

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

此時，CloudWatch 日誌應已包含此項目中的所有資訊。

**篩選條件範例**
+ **僅限符合指定州的產品**

開啟檔案 `ddb_filters/ddb_filters/ddb_filters_stack.py` 加以修改，使其納入符合所有等於 "FL" 之產品的篩選條件。這可以在第 45 行中 `event_subscription` 的正下方進行修改。

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```
+ **僅限以 PK 和 SK 中部分值開頭的項目**

修改 Python 指令碼以納入下列條件：

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
        ]
    },
```
+ **以位於 PK 和 SK 中的部分值開頭或來自特定州。**

修改 python 指令碼以納入下列條件：

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```

請注意，新增 OR 條件的方法是在篩選條件陣列中新增更多元素。

**清除**

請在工作目錄的底部找到篩選條件堆疊，然後執行 `cdk destroy`。系統會要求您確認刪除此資源：

```
cdk destroy
Are you sure you want to delete: DdbFiltersStack (y/n)? y
```