

# チュートリアル \$12: DynamoDB と Lambda での、フィルターを使用したいくつかのイベントの処理。
<a name="Streams.Lambda.Tutorial2"></a>

このチュートリアルでは、AWS Lambda トリガーを作成して、DynamoDB テーブルからのストリームの一部のイベントのみを処理します。

**Topics**
+ [すべてをまとめる - CloudFormation](#Streams.Lambda.Tutorial2.Cloudformation)
+ [すべてをまとめる - CDK](#Streams.Lambda.Tutorial2.CDK)

[Lambda イベントフィルタリング](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html)では、フィルター式を使用して、処理のために Lambda が関数に送信するイベントを制御できます。DynamoDB ストリームごとに最大 5 つの異なるフィルターを設定できます。バッチ処理ウィンドウを使用している場合、Lambda は新しいイベントそれぞれにフィルター条件を適用して、現在のバッチに含めるかどうかを確認します。

フィルターは `FilterCriteria` と呼ばれる構造を介して適用されます。`FilterCriteria` の 3 つの主な属性は `metadata properties`、`data properties`、および `filter patterns` です。

DynamoDB Streams イベントの構造の例を次に示します。

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

`metadata properties` は、イベントオブジェクトのフィールドです。DynamoDB Streams の場合、`metadata properties` は `dynamodb` や `eventName` のようなフィールドです。

`data properties` は、イベント本文のフィールドです。`data properties` をフィルタリングするには、それらを適切なキー内の `FilterCriteria` に含めるようにしてください。DynamoDB イベントソースのデータキーは `NewImage` または `OldImage` です。

最後に、フィルタールールは、特定のプロパティに適用するフィルター式を定義します。次に例を示します。


| 比較演算子 | 例 | ルール構文 (一部) | 
| --- | --- | --- | 
|  Null  |  製品タイプは NULL  |  `{ "product_type": { "S": null } } `  | 
|  空  |  製品名は空白  |  `{ "product_name": { "S": [ ""] } } `  | 
|  Equals  |  州はフロリダに等しい  |  `{ "state": { "S": ["FL"] } } `  | 
|  And  |  製品州はフロリダ、製品カテゴリはチョコレート  |  `{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } } `  | 
|  または  |  製品州はフロリダまたはカリフォルニア  |  `{ "state": { "S": ["FL","CA"] } } `  | 
|  Not  |  製品州はフロリダ州ではない  |  `{"state": {"S": [{"anything-but": ["FL"]}]}}`  | 
|  存在する  |  地産品は存在する  |  `{"homemade": {"S": [{"exists": true}]}}`  | 
|  存在しない  |  地産品は存在しない  |  `{"homemade": {"S": [{"exists": false}]}}`  | 
|  Begins with  |  PK は COMPANY から始まる  |  `{"PK": {"S": [{"prefix": "COMPANY"}]}}`  | 

Lambda 関数には、最大 5 つのイベントフィルタリングパターンを指定できます。これら 5 つのイベントのそれぞれが論理 OR として評価されることに注意してください。そのため、`Filter_One` および `Filter_Two` という名前の 2 つのフィルターを設定すると、Lambda 関数は `Filter_One` OR `Filter_Two` を実行します。

**注記**  
[Lambda イベントのフィルタリング](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html)ページには、数値をフィルタリングして比較するオプションがいくつかありますが、DynamoDB のフィルタイベントの場合、DynamoDB の数値は文字列として保存されるため、適用されません。例えば ` "quantity": { "N": "50" }` の場合は、`"N"` プロパティのおかげでそれが数字だとわかります。

## すべてをまとめる - CloudFormation
<a name="Streams.Lambda.Tutorial2.Cloudformation"></a>

実際のイベントフィルタリング機能を見ていただくために、CloudFormation テンプレートのサンプルを以下に示します。このテンプレートは、Amazon DynamoDB Streams を有効にしたパーティションキー PK とソートキー SK を含むシンプルな DynamoDB テーブルを生成します。これにより、Amazon Cloudwatch へのログの書き込みと Amazon DynamoDB Streams からのイベントの読み取りを許可する Lambda 関数とシンプルな Lambda 実行ロールが作成されます。また、DynamoDB Streams と Lambda 関数間にイベントソースマッピングも追加されるため、Amazon DynamoDB Streams にイベントが発生するたびに関数を実行できます。

```
AWSTemplateFormatVersion: "2010-09-09"

Description: Sample application that presents AWS Lambda event source filtering 
with Amazon DynamoDB Streams.

Resources:
  StreamsSampleDDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: "PK"
          AttributeType: "S"
        - AttributeName: "SK"
          AttributeType: "S"
      KeySchema:
        - AttributeName: "PK"
          KeyType: "HASH"
        - AttributeName: "SK"
          KeyType: "RANGE"
      StreamSpecification:
        StreamViewType: "NEW_AND_OLD_IMAGES"
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 5

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17",		 	 	 
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: "/"
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: "2012-10-17",		 	 	 
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: arn:aws:logs:*:*:*
              - Effect: Allow
                Action:
                  - dynamodb:DescribeStream
                  - dynamodb:GetRecords
                  - dynamodb:GetShardIterator
                  - dynamodb:ListStreams
                Resource: !GetAtt StreamsSampleDDBTable.StreamArn

  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST

  ProcessEventLambda:
    Type: AWS::Lambda::Function
    Properties:
      Runtime: python3.7
      Timeout: 300
      Handler: index.handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Code:
        ZipFile: |
          import logging

          LOGGER = logging.getLogger()
          LOGGER.setLevel(logging.INFO)

          def handler(event, context):
            LOGGER.info('Received Event: %s', event)
            for rec in event['Records']:
              LOGGER.info('Record: %s', rec)

Outputs:
  StreamsSampleDDBTable:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.Arn
  StreamARN:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.StreamArn
```

この CloudFormation テンプレートをデプロイすると、次の Amazon DynamoDB 項目を挿入できます。

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

この CloudFormation テンプレートにインラインで組み込まれているシンプルな Lambda 関数により、Lambda 関数の Amazon CloudWatch ロググループのイベントは次のように表示されます。

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

**フィルター例**
+ **特定の州に一致する商品のみ**

この例では、CloudFormation テンプレートを変更して、フロリダで生産されたすべての製品 (略称「FL」) と一致するフィルターを含めます。

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

スタックを再デプロイしたら、テーブルに次の DynamoDB 項目を追加できます。この例での製品はカリフォルニア産なので、Lambda 関数ログには表示されないことに注意してください。

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK#1000",
 "company_id": "1000",
 "fabric": "Florida Chocolates",
 "price": 15,
 "product_id": "1000",
 "quantity": 50,
 "state": "CA",
 "stores": 5,
 "type": ""
}
```
+ **PK と SK のうち、ある値で始まるアイテムのみ**

この例では、CloudFormation テンプレートを変更して次の条件を含めます。

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

AND 条件では、条件がパターン内になければならないことに注意してください。ここで、キー PK と SK は同じ式内にあり、カンマで区切られます。

PK と SK がある値から始まるか、特定の州産かのどちらかです。

この例では、CloudFormation テンプレートを変更して次の条件を含めます。

```
  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

フィルターセクションに新しいパターンを導入することで、OR 条件が追加されます。

## すべてをまとめる - CDK
<a name="Streams.Lambda.Tutorial2.CDK"></a>

次のサンプル CDK プロジェクト形成テンプレートでは、イベントフィルタリング機能について説明します。この CDK プロジェクトに取り組む前に、[準備スクリプトの実行](https://docs.aws.amazon.com/cdk/v2/guide/work-with-cdk-python.html)を含む[前提条件をインストール](https://docs.aws.amazon.com/cdk/v2/guide/work-with.html)する必要があります。

**CDK プロジェクトを作成する**

まず、空のディレクトリで `cdk init` を呼び出して、新しい AWS CDK プロジェクトを作成します。

```
mkdir ddb_filters
cd ddb_filters
cdk init app --language python
```

この `cdk init` コマンドは、プロジェクトフォルダの名前を使用して、クラス、サブフォルダ、ファイルなど、プロジェクトのさまざまな要素に名前を付けます。フォルダ名に含まれるハイフンはすべてアンダースコアに変換されます。それ以外の場合、名前は Python 識別子の形式に従う必要があります。例えば、数字で始めたり、スペースを含めたりはしないでください。

新しいプロジェクトで作業するには、その仮想環境を有効にします。これにより、プロジェクトの依存関係をグローバルにインストールするのではなく、プロジェクトフォルダにローカルにインストールできます。

```
source .venv/bin/activate
python -m pip install -r requirements.txt
```

**注記**  
これは、仮想環境をアクティブにする Mac/Linux コマンドとして認識されるかもしれません。Python テンプレートには、同じコマンドを Windows で使用できるようにするバッチファイル、`source.bat` が含まれています。従来の Windows コマンド、`.venv\Scripts\activate.bat` も機能します。AWS CDK Toolkit v1.70.0 以前を使用して AWS CDK プロジェクトを初期化した場合、仮想環境は `.venv` ではなく `.env` ディレクトリにあります。

**基本インフラストラクチャ**

任意のテキストエディタでファイル `./ddb_filters/ddb_filters_stack.py` を開きます。このファイルは、AWS CDK プロジェクトを作成したときに自動生成されました。

次に、`_create_ddb_table` および `_set_ddb_trigger_function` 関数を追加します。これらの関数は、プロビジョニングモードのオンデマンドモードでパーティションキー PK とソートキー SK を含む DynamoDB テーブルを作成します。Amazon DynamoDB Streams をデフォルトで有効にして、新しいイメージと古いイメージを表示できます。

Lambda 関数はファイル `app.py` の下のフォルダ `lambda` に保存されます。このファイルは後で作成されます。これには環境変数 `APP_TABLE_NAME` が含まれます。この変数は、このスタックによって作成される Amazon DynamoDB テーブルの名前になります。同じ関数で、Lambda 関数にストリーム読み取り権限を付与します。最後に、Lambda 関数のイベントソースとして DynamoDB Streams にサブスクライブします。

`__init__` メソッド内のファイルの最後で、それぞれの構成を呼び出してスタック内で初期化します。追加のコンポーネントやサービスを必要とする大規模なプロジェクトでは、これらの構成を基本スタックの外部で定義するのが最適な場合があります。

```
import os
import json

import aws_cdk as cdk
from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_dynamodb as dynamodb,
)
from constructs import Construct


class DdbFiltersStack(Stack):

    def _create_ddb_table(self):
        dynamodb_table = dynamodb.Table(
            self,
            "AppTable",
            partition_key=dynamodb.Attribute(
                name="PK", type=dynamodb.AttributeType.STRING
            ),
            sort_key=dynamodb.Attribute(
                name="SK", type=dynamodb.AttributeType.STRING),
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
            stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
            removal_policy=cdk.RemovalPolicy.DESTROY,
        )

        cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name)
        return dynamodb_table

    def _set_ddb_trigger_function(self, ddb_table):
        events_lambda = _lambda.Function(
            self,
            "LambdaHandler",
            runtime=_lambda.Runtime.PYTHON_3_9,
            code=_lambda.Code.from_asset("lambda"),
            handler="app.handler",
            environment={
                "APP_TABLE_NAME": ddb_table.table_name,
            },
        )

        ddb_table.grant_stream_read(events_lambda)

        event_subscription = _lambda.CfnEventSourceMapping(
            scope=self,
            id="companyInsertsOnlyEventSourceMapping",
            function_name=events_lambda.function_name,
            event_source_arn=ddb_table.table_stream_arn,
            maximum_batching_window_in_seconds=1,
            starting_position="LATEST",
            batch_size=1,
        )

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        ddb_table = self._create_ddb_table()
        self._set_ddb_trigger_function(ddb_table)
```

次に、Amazon CloudWatch にログを出力する、非常にシンプルな Lambda 関数を作成します。それには、`lambda` という新しいフォルダを作成します。

```
mkdir lambda
touch app.py
```

任意のテキストエディタを使用して、次の内容を `app.py` ファイルに追加します。

```
import logging

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)


def handler(event, context):
    LOGGER.info('Received Event: %s', event)
    for rec in event['Records']:
        LOGGER.info('Record: %s', rec)
```

`/ddb_filters/` フォルダにいることを確認し、次のコマンドを入力してサンプルアプリケーションを作成します。

```
cdk deploy
```

ある時点で、ソリューションをデプロイするかどうか確認する画面が表示されます。`Y` を入力して変更を確定します。

```
├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤
│ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │
└───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘

Do you wish to deploy these changes (y/n)? y

...

✨  Deployment time: 67.73s

Outputs:
DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP
Stack ARN:
arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6
```

変更がデプロイされたら、AWS コンソールを開いてテーブルに項目を 1 つ追加します。

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

これで CloudWatch ログには、このエントリの情報がすべて含まれているはずです。

**フィルター例**
+ **特定の州に一致する商品のみ**

ファイル `ddb_filters/ddb_filters/ddb_filters_stack.py` を開き、「FL」に等しいすべての製品と一致するフィルターを含めるように変更します。これは 45 行目の `event_subscription` のすぐ下で修正できます。

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```
+ **PK と SK のうち、ある値で始まる項目のみ**

Python スクリプトを次の条件を含むように変更します。

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
        ]
    },
```
+ **PK と SK がある値から始まるか、特定の州産かのどちらかです。**

Python スクリプトを次の条件を含むように変更します。

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```

Filters 配列に要素を追加することで OR 条件が追加されることに注意してください。

**クリーンアップ**

作業ディレクトリのベースにあるフィルタースタックを見つけて、`cdk destroy` を実行します。リソースの削除を確認するメッセージが表示されます。

```
cdk destroy
Are you sure you want to delete: DdbFiltersStack (y/n)? y
```