

# 教程 2：对 DynamoDB 和 Lambda 使用筛选器来处理部分事件。
<a name="Streams.Lambda.Tutorial2"></a>

在本教程中，您将创建 AWS Lambda 触发器以处理来自 DynamoDB 表的流中的部分事件。

**Topics**
+ [组合起来 – CloudFormation](#Streams.Lambda.Tutorial2.Cloudformation)
+ [组合起来 – CDK](#Streams.Lambda.Tutorial2.CDK)

通过 [Lambda 事件筛选](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html)，您可以使用筛选表达式来控制 Lambda 将哪些事件发送给函数进行处理。每个 DynamoDB 流最多可以配置 5 个不同的筛选器。如果您使用的是批处理时段，则 Lambda 会对每个新事件应用筛选条件，以确定是否将其包括在当前批处理中。

筛选器通过名为 `FilterCriteria` 的结构来应用。`FilterCriteria` 的 3 个主要属性为 `metadata properties`、`data properties` 和 `filter patterns`。

DynamoDB Streams 事件的示例结构如下所示：

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

`metadata properties` 是事件对象的字段。在 DynamoDB Streams 中，`metadata properties` 是 `dynamodb` 或 `eventName` 这样的字段。

`data properties` 是事件主体的字段。要根据 `data properties` 进行筛选，请确保将它们包含在正确的键内的 `FilterCriteria` 中。对于 DynamoDB 事件源，数据键为 `NewImage` 或 `OldImage`。

最后，筛选条件规则将定义要应用到特定属性的筛选条件表达式。下面是一些示例：


| 比较运算符 | 示例 | 规则语法（部分） | 
| --- | --- | --- | 
|  Null  |  产品类型为 null  |  `{ "product_type": { "S": null } } `  | 
|  Empty  |  产品名称为空  |  `{ "product_name": { "S": [ ""] } } `  | 
|  Equals  |  州为佛罗里达州  |  `{ "state": { "S": ["FL"] } } `  | 
|  And  |  产品的州为佛罗里达州且产品类别为巧克力  |  `{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } } `  | 
|  Or  |  产品的州为佛罗里达州或加利佛尼亚州  |  `{ "state": { "S": ["FL","CA"] } } `  | 
|  Not  |  产品的州不是佛罗里达州  |  `{"state": {"S": [{"anything-but": ["FL"]}]}}`  | 
|  Exists  |  存在自制产品  |  `{"homemade": {"S": [{"exists": true}]}}`  | 
|  不存在  |  不存在自制产品  |  `{"homemade": {"S": [{"exists": false}]}}`  | 
|  始于  |  COMPANY 以 PK 开头  |  `{"PK": {"S": [{"prefix": "COMPANY"}]}}`  | 

您最多可以为一个 Lambda 函数指定 5 个事件筛选模式。请注意，这 5 个事件中的每一个都将作为逻辑 OR 进行求值。因此，如果您配置了名为 `Filter_One` 和 `Filter_Two` 的两个筛选条件，则 Lambda 函数将执行 `Filter_One` OR `Filter_Two`。

**注意**  
在[Lambda 事件筛选](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html)页面中，有一些用于筛选和比较数值的选项，但不适用于 DynamoDB 筛选事件，因为 DynamoDB 中的数字作为字符串存储。例如 ` "quantity": { "N": "50" }`，由于 `"N"` 属性，我们知道它是一个数字。

## 组合起来 – CloudFormation
<a name="Streams.Lambda.Tutorial2.Cloudformation"></a>

为了展示事件筛选功能的实际应用，下面提供了一个示例 CloudFormation 模板。此模板将生成一个简单的 DynamoDB 表，带有分区键 PK 和排序键 SK，并启用了 Amazon DynamoDB Streams。它将创建一个 Lambda 函数和一个简单的 Lambda 执行角色，允许将日志写入 Amazon Cloudwatch，并从 Amazon DynamoDB Stream 中读取事件。它还在 DynamoDB Streams 与 Lambda 函数之间添加事件源映射，因此每次在 Amazon DynamoDB Stream 中出现事件时都可以执行该函数。

```
AWSTemplateFormatVersion: "2010-09-09"

Description: Sample application that presents AWS Lambda event source filtering 
with Amazon DynamoDB Streams.

Resources:
  StreamsSampleDDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: "PK"
          AttributeType: "S"
        - AttributeName: "SK"
          AttributeType: "S"
      KeySchema:
        - AttributeName: "PK"
          KeyType: "HASH"
        - AttributeName: "SK"
          KeyType: "RANGE"
      StreamSpecification:
        StreamViewType: "NEW_AND_OLD_IMAGES"
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 5

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17",		 	 	 
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: "/"
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: "2012-10-17",		 	 	 
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: arn:aws:logs:*:*:*
              - Effect: Allow
                Action:
                  - dynamodb:DescribeStream
                  - dynamodb:GetRecords
                  - dynamodb:GetShardIterator
                  - dynamodb:ListStreams
                Resource: !GetAtt StreamsSampleDDBTable.StreamArn

  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST

  ProcessEventLambda:
    Type: AWS::Lambda::Function
    Properties:
      Runtime: python3.7
      Timeout: 300
      Handler: index.handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Code:
        ZipFile: |
          import logging

          LOGGER = logging.getLogger()
          LOGGER.setLevel(logging.INFO)

          def handler(event, context):
            LOGGER.info('Received Event: %s', event)
            for rec in event['Records']:
              LOGGER.info('Record: %s', rec)

Outputs:
  StreamsSampleDDBTable:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.Arn
  StreamARN:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.StreamArn
```

部署此 CloudFormation 模板后，您可以插入以下 Amazon DynamoDB 项目：

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

由于此 CloudFormation 模板中内嵌了简单的 Lambda 函数，您将在 Amazon CloudWatch 日志组中看到 Lambda 函数的事件，如下所示：

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

**筛选示例**
+ **仅限与给定州匹配的产品**

此示例修改了 CloudFormation 模板，使其包含一个筛选条件，用于匹配来自佛罗里达州的所有产品，缩写为“FL”。

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

重新部署堆栈后，可以将以下 DynamoDB 项目添加到表中。请注意，它不会出现在 Lambda 函数日志中，因为本示例中的产品来自加利佛尼亚州。

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK#1000",
 "company_id": "1000",
 "fabric": "Florida Chocolates",
 "price": 15,
 "product_id": "1000",
 "quantity": 50,
 "state": "CA",
 "stores": 5,
 "type": ""
}
```
+ **仅限以 PK 和 SK 中某些值开头的项目**

此示例修改 CloudFormation 模板，使其包含以下条件：

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

请注意 AND 条件要求条件位于模式内，其中键 PK 和 SK 位于同一个表达式中，以逗号分隔。

或者是以 PK 和 SK 开头的某些值，或者来自特定状态。

此示例修改 CloudFormation 模板，使其包含以下条件：

```
  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

请注意，OR 条件是通过在筛选条件部分引入新模式来添加的。

## 组合起来 – CDK
<a name="Streams.Lambda.Tutorial2.CDK"></a>

以下示例 CDK 项目 Formation 模板介绍了事件筛选功能。在使用此 CDK 项目之前，您需要[安装先决条件](https://docs.aws.amazon.com/cdk/v2/guide/work-with.html)，包括[运行准备脚本](https://docs.aws.amazon.com/cdk/v2/guide/work-with-cdk-python.html)。

**创建 CDK 项目**

首先通过在空目录中调用 `cdk init`，创建一个新的 AWS CDK 项目。

```
mkdir ddb_filters
cd ddb_filters
cdk init app --language python
```

`cdk init` 命令使用项目文件夹的名称来命名项目的各种元素，包括类、子文件夹和文件。文件夹名称中的所有连字符都将转换为下划线。否则，该名称应遵循 Python 标识符的格式。例如，名称不应以数字开头，也不能包含空格。

要使用新项目，请激活其虚拟环境。这允许将项目的依赖项安装在本地项目文件夹中，而不是全局安装。

```
source .venv/bin/activate
python -m pip install -r requirements.txt
```

**注意**  
您可将其视为用于激活虚拟环境的 Mac/Linux 命令。Python 模板包含一个批处理文件 `source.bat`，该文件允许在 Windows 上使用相同的命令。也可以使用传统的 Windows 命令 `.venv\Scripts\activate.bat`。如果您使用 AWS CDK Toolkit v1.70.0 或更早版本来初始化 AWS CDK 项目，则您的虚拟环境位于 `.env` 目录中，而不是 `.venv`。

**基本基础设施**

使用首选文本编辑器打开文件 `./ddb_filters/ddb_filters_stack.py`。此文件在您创建 AWS CDK 项目时自动生成。

接下来，添加函数 `_create_ddb_table` 和 `_set_ddb_trigger_function`。这些函数将在预置模式/按需模式下创建一个 DynamoDB 表，该表带有分区键 PK 和排序键 SK，并且默认启用了 Amazon DynamoDB Streams 以显示新映像和旧映像。

Lambda 函数将存储在文件夹 `lambda` 下的文件 `app.py` 中。此文件将稍后创建。它包含一个环境变量 `APP_TABLE_NAME`，这将成为此堆栈创建的 Amazon DynamoDB 表的名称。在同一个函数中，我们向 Lambda 函数授予流读取权限。最后，它将订阅 DynamoDB Streams 作为 Lambda 函数的事件源。

在 `__init__` 方法中文件的末尾，您将调用相应的构造以在堆栈中初始化它们。对于需要额外组件和服务的较大项目，最好在基础堆栈之外定义这些构造。

```
import os
import json

import aws_cdk as cdk
from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_dynamodb as dynamodb,
)
from constructs import Construct


class DdbFiltersStack(Stack):

    def _create_ddb_table(self):
        dynamodb_table = dynamodb.Table(
            self,
            "AppTable",
            partition_key=dynamodb.Attribute(
                name="PK", type=dynamodb.AttributeType.STRING
            ),
            sort_key=dynamodb.Attribute(
                name="SK", type=dynamodb.AttributeType.STRING),
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
            stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
            removal_policy=cdk.RemovalPolicy.DESTROY,
        )

        cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name)
        return dynamodb_table

    def _set_ddb_trigger_function(self, ddb_table):
        events_lambda = _lambda.Function(
            self,
            "LambdaHandler",
            runtime=_lambda.Runtime.PYTHON_3_9,
            code=_lambda.Code.from_asset("lambda"),
            handler="app.handler",
            environment={
                "APP_TABLE_NAME": ddb_table.table_name,
            },
        )

        ddb_table.grant_stream_read(events_lambda)

        event_subscription = _lambda.CfnEventSourceMapping(
            scope=self,
            id="companyInsertsOnlyEventSourceMapping",
            function_name=events_lambda.function_name,
            event_source_arn=ddb_table.table_stream_arn,
            maximum_batching_window_in_seconds=1,
            starting_position="LATEST",
            batch_size=1,
        )

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        ddb_table = self._create_ddb_table()
        self._set_ddb_trigger_function(ddb_table)
```

现在，我们将创建一个非常简单的 Lambda 函数，它将日志输出到 Amazon CloudWatch 中。为此，请创建一个名为 `lambda` 的新文件夹。

```
mkdir lambda
touch app.py
```

使用您常用的文本编辑器，将以下内容添加到 `app.py` 文件中：

```
import logging

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)


def handler(event, context):
    LOGGER.info('Received Event: %s', event)
    for rec in event['Records']:
        LOGGER.info('Record: %s', rec)
```

确保您位于 `/ddb_filters/` 文件夹中，键入以下命令创建示例应用程序：

```
cdk deploy
```

在某个时候，系统会要求您确认是否要部署解决方案。键入 `Y` 接受更改。

```
├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤
│ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │
└───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘

Do you wish to deploy these changes (y/n)? y

...

✨  Deployment time: 67.73s

Outputs:
DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP
Stack ARN:
arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6
```

部署更改后，打开 AWS 控制台并向表中添加一个项目。

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

现在，CloudWatch 日志应该包含此条目中的所有信息。

**筛选示例**
+ **仅限与给定州匹配的产品**

打开文件 `ddb_filters/ddb_filters/ddb_filters_stack.py` 并进行修改，使其包含与所有等于“FL”的产品相匹配的筛选条件。可以在第 45 行的 `event_subscription` 下方对其进行修改。

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```
+ **仅限以 PK 和 SK 中某些值开头的项目**

修改 Python 脚本以包含以下条件：

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
        ]
    },
```
+ **或者是以 PK 和 SK 开头的某些值，或者来自特定状态。**

修改 Python 脚本以包含以下条件：

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```

请注意，向筛选数组添加更多元素时，将会添加 OR 条件。

**清除**

在工作目录的底部找到筛选器堆栈，然后执行 `cdk destroy`。系统将要求您确认删除资源：

```
cdk destroy
Are you sure you want to delete: DdbFiltersStack (y/n)? y
```