

# 자습서 \$12: 필터를 사용하여 DynamoDB 및 Lambda에서 일부 이벤트 처리
<a name="Streams.Lambda.Tutorial2"></a>

이 자습서에서는 DynamoDB 테이블의 스트림에서 일부 이벤트만 처리하기 위해 AWS Lambda 트리거를 생성합니다.

**Topics**
+ [종합 - CloudFormation](#Streams.Lambda.Tutorial2.Cloudformation)
+ [종합 - CDK](#Streams.Lambda.Tutorial2.CDK)

[Lambda 이벤트 필터링](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html)을 사용하면 필터 표현식을 사용하여 Lambda가 어떤 이벤트를 처리를 위해 함수로 보내는지를 제어할 수 있습니다. DynamoDB Streams당 최대 5개의 서로 다른 필터를 구성할 수 있습니다. 일괄 처리 기간을 사용하는 경우 Lambda는 각 새 이벤트에 필터 기준을 적용하여 현재 배치에 포함되어야 하는지 확인합니다.

필터는 `FilterCriteria`라는 구조를 통해 적용됩니다. `FilterCriteria`의 3가지 주요 속성은 `metadata properties`, `data properties` 및 `filter patterns`입니다.

다음은 DynamoDB Streams 이벤트의 예제 구조입니다.

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

`metadata properties`는 이벤트 객체의 필드입니다. DynamoDB Streams의 경우 `metadata properties`은 `dynamodb` 또는 `eventName`과 같은 필드입니다.

`data properties`은 이벤트 본문의 필드입니다. `data properties`을 필터링하려면 적절한 키 내의 `FilterCriteria`에 해당 속성을 포함해야 합니다. DynamoDB 이벤트 소스의 경우 데이터 키는 `NewImage` 또는 `OldImage`입니다.

마지막으로 필터 규칙은 특정 속성에 적용할 필터 표현식을 정의합니다. 여기 몇 가지 예가 있습니다:


| 비교 연산자 | 예제 | 규칙 구문(부분적) | 
| --- | --- | --- | 
|  Null  |  제품 유형이 null입니다.  |  `{ "product_type": { "S": null } } `  | 
|  비어 있음  |  제품 이름이 비어 있습니다.  |  `{ "product_name": { "S": [ ""] } } `  | 
|  같음  |  주가 플로리다와 같습니다.  |  `{ "state": { "S": ["FL"] } } `  | 
|  및  |  제품 주는 플로리다와 같고 제품 범주는 초콜릿과 같습니다.  |  `{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } } `  | 
|  또는  |  제품 주는 플로리다 또는 캘리포니아입니다.  |  `{ "state": { "S": ["FL","CA"] } } `  | 
|  Not  |  제품 주는 플로리다가 아닙니다.  |  `{"state": {"S": [{"anything-but": ["FL"]}]}}`  | 
|  존재함  |  홈메이드 제품이 있습니다.  |  `{"homemade": {"S": [{"exists": true}]}}`  | 
|  존재하지 않음  |  홈메이드 제품이 존재하지 않습니다  |  `{"homemade": {"S": [{"exists": false}]}}`  | 
|  로 시작함  |  PK는 COMPANY로 시작합니다.  |  `{"PK": {"S": [{"prefix": "COMPANY"}]}}`  | 

Lambda 함수에 대해 최대 5개의 이벤트 필터링 패턴을 지정할 수 있습니다. 5개의 이벤트 각각은 논리적 OR로 평가됩니다. 따라서 `Filter_One` 및 `Filter_Two`라는 두 개의 필터를 구성하면 Lambda 함수가 `Filter_One` 또는 `Filter_Two`를 실행합니다.

**참고**  
[Lambda 이벤트 필터링](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html) 페이지에는 숫자 값을 필터링하고 비교하는 몇 가지 옵션이 있지만 DynamoDB 필터 이벤트의 경우 DynamoDB의 숫자가 문자열로 저장되기 때문에 적용되지 않습니다. 예를 들어 ` "quantity": { "N": "50" }`의 경우, `"N"` 속성 때문에 숫자임을 알 수 있습니다.

## 종합 - CloudFormation
<a name="Streams.Lambda.Tutorial2.Cloudformation"></a>

이벤트 필터링 기능을 실제로 보여주기 위해 CloudFormation 템플릿 샘플을 소개합니다. 이 템플릿은 Amazon DynamoDB Streams가 활성화된 파티션 키 PK 및 정렬 키 SK가 있는 간단한 DynamoDB 테이블을 생성합니다. 그러면 Amazon Cloudwatch에 로그를 쓰고 Amazon DynamoDB 스트림에서 이벤트를 읽을 수 있는 람다 함수와 간단한 Lambda 실행 역할이 생성됩니다. 또한 DynamoDB Streams와 Lambda 함수 사이에 이벤트 소스 매핑을 추가하므로 Amazon DynamoDB Streams에 이벤트가 있을 때마다 함수를 실행할 수 있습니다.

```
AWSTemplateFormatVersion: "2010-09-09"

Description: Sample application that presents AWS Lambda event source filtering 
with Amazon DynamoDB Streams.

Resources:
  StreamsSampleDDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: "PK"
          AttributeType: "S"
        - AttributeName: "SK"
          AttributeType: "S"
      KeySchema:
        - AttributeName: "PK"
          KeyType: "HASH"
        - AttributeName: "SK"
          KeyType: "RANGE"
      StreamSpecification:
        StreamViewType: "NEW_AND_OLD_IMAGES"
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 5

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17",		 	 	 
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: "/"
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: "2012-10-17",		 	 	 
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: arn:aws:logs:*:*:*
              - Effect: Allow
                Action:
                  - dynamodb:DescribeStream
                  - dynamodb:GetRecords
                  - dynamodb:GetShardIterator
                  - dynamodb:ListStreams
                Resource: !GetAtt StreamsSampleDDBTable.StreamArn

  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST

  ProcessEventLambda:
    Type: AWS::Lambda::Function
    Properties:
      Runtime: python3.7
      Timeout: 300
      Handler: index.handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Code:
        ZipFile: |
          import logging

          LOGGER = logging.getLogger()
          LOGGER.setLevel(logging.INFO)

          def handler(event, context):
            LOGGER.info('Received Event: %s', event)
            for rec in event['Records']:
              LOGGER.info('Record: %s', rec)

Outputs:
  StreamsSampleDDBTable:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.Arn
  StreamARN:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.StreamArn
```

이 클라우드 구성 템플릿을 배포한 후 다음과 같은 Amazon DynamoDB 항목을 삽입할 수 있습니다.

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

이 클라우드 형성 템플릿에 인라인으로 포함된 간단한 람다 함수 덕분에 다음과 같이 람다 함수에 대한 Amazon CloudWatch 로그 그룹의 이벤트를 볼 수 있습니다.

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

**필터링 예제**
+ **지정된 주와 일치하는 제품만**

이 예제에서는 플로리다(약어 'FL')에서 생산되는 모든 제품을 일치시키는 필터를 포함하도록 CloudFormation 템플릿을 수정합니다.

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

스택을 재배포하고 나면 다음 DynamoDB 항목을 테이블에 추가할 수 있습니다. 이 예제의 제품은 캘리포니아에서 만들어진 제품이므로 Lambda 함수 로그에는 표시되지 않습니다.

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK#1000",
 "company_id": "1000",
 "fabric": "Florida Chocolates",
 "price": 15,
 "product_id": "1000",
 "quantity": 50,
 "state": "CA",
 "stores": 5,
 "type": ""
}
```
+ **PK 및 SK의 일부 값으로 시작하는 항목만**

이 예에서는 다음 조건을 포함하도록 CloudFormation 템플릿을 수정합니다.

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

AND 조건을 사용하려면 패턴 내부에 조건이 있어야 하며 이 경우 PK 및 SK 키가 쉼표로 구분되어 동일한 표현식 안에 있습니다.

PK 및 SK의 일부 값으로 시작하거나 특정 상태에서 시작합니다.

이 예에서는 다음 조건을 포함하도록 CloudFormation 템플릿을 수정합니다.

```
  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

필터 섹션에 새 패턴을 도입하여 OR 조건이 추가되었음을 알 수 있습니다.

## 종합 - CDK
<a name="Streams.Lambda.Tutorial2.CDK"></a>

다음 샘플 CDK 프로젝트 구성 템플릿은 이벤트 필터링 기능을 안내합니다. 이 CDK 프로젝트로 작업하기 전에 [준비 스크립트 실행](https://docs.aws.amazon.com/cdk/v2/guide/work-with-cdk-python.html)을 포함한 [전제 조건을 설치](https://docs.aws.amazon.com/cdk/v2/guide/work-with.html)해야 합니다.

**CDK 프로젝트 만들기**

먼저 빈 디렉터리에서 `cdk init`를 호출하여 새 AWS CDK 프로젝트를 생성합니다.

```
mkdir ddb_filters
cd ddb_filters
cdk init app --language python
```

`cdk init` 명령은 프로젝트 폴더의 이름을 사용하여 클래스, 하위 폴더 및 파일을 포함한 프로젝트의 다양한 요소의 이름을 지정합니다. 폴더 이름의 하이픈은 밑줄로 변환됩니다. 그렇지 않으면 이름은 Python 식별자 형식을 따라야 합니다. 예를 들어 숫자로 시작하거나 공백을 포함해서는 안 됩니다.

새 프로젝트를 사용하려면 해당 가상 환경을 활성화하세요. 이렇게 하면 프로젝트의 종속성을 전역적으로 설치하는 대신 프로젝트 폴더에 로컬로 설치할 수 있습니다.

```
source .venv/bin/activate
python -m pip install -r requirements.txt
```

**참고**  
이를 Mac/Linux 명령으로 인식하여 가상 환경을 활성화할 수 있습니다. Python 템플릿에는 Windows에서 동일한 명령을 사용할 수 있도록 하는 배치 파일인 `source.bat`가 포함되어 있습니다. 기존 Windows 명령 `.venv\Scripts\activate.bat`도 작동합니다. AWS CDK Toolkit v1.70.0 이하를 사용하여 AWS CDK 프로젝트를 초기화한 경우 가상 환경은 .`.venv` 대신 `.env` 디렉터리에 있습니다.

**기본 인프라**

선호하는 텍스트 편집기로 `./ddb_filters/ddb_filters_stack.py` 파일을 엽니다. 이 파일은 AWS CDK 프로젝트를 생성할 때 자동으로 생성되었습니다.

다음으로 `_create_ddb_table` 및 `_set_ddb_trigger_function` 함수를 추가합니다. 이러한 함수는 프로비저닝 모드 온디맨드 모드에서 파티션 키 PK 및 정렬 키 SK가 있는 DynamoDB 테이블을 생성하며 Amazon DynamoDB Streams가 기본적으로 활성화되어 새 이미지와 이전 이미지를 표시합니다.

Lambda 함수는 `app.py` 파일 아래의 `lambda` 폴더에 저장됩니다. 이 파일은 나중에 생성됩니다. 여기에는 이 스택에서 생성된 Amazon DynamoDB 테이블의 이름이 될 환경 변수 `APP_TABLE_NAME`이 포함됩니다. 동일한 함수에서 Lambda 함수에 스트림 읽기 권한을 부여합니다. 마지막으로 DynamoDB Streams를 람다 함수의 이벤트 소스로 구독합니다.

`__init__` 메서드의 파일 끝에서 각 구성을 호출하여 스택에서 초기화합니다. 추가 구성 요소와 서비스가 필요한 더 큰 프로젝트의 경우 기본 스택 외부에서 이러한 구성을 정의하는 것이 가장 좋습니다.

```
import os
import json

import aws_cdk as cdk
from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_dynamodb as dynamodb,
)
from constructs import Construct


class DdbFiltersStack(Stack):

    def _create_ddb_table(self):
        dynamodb_table = dynamodb.Table(
            self,
            "AppTable",
            partition_key=dynamodb.Attribute(
                name="PK", type=dynamodb.AttributeType.STRING
            ),
            sort_key=dynamodb.Attribute(
                name="SK", type=dynamodb.AttributeType.STRING),
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
            stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
            removal_policy=cdk.RemovalPolicy.DESTROY,
        )

        cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name)
        return dynamodb_table

    def _set_ddb_trigger_function(self, ddb_table):
        events_lambda = _lambda.Function(
            self,
            "LambdaHandler",
            runtime=_lambda.Runtime.PYTHON_3_9,
            code=_lambda.Code.from_asset("lambda"),
            handler="app.handler",
            environment={
                "APP_TABLE_NAME": ddb_table.table_name,
            },
        )

        ddb_table.grant_stream_read(events_lambda)

        event_subscription = _lambda.CfnEventSourceMapping(
            scope=self,
            id="companyInsertsOnlyEventSourceMapping",
            function_name=events_lambda.function_name,
            event_source_arn=ddb_table.table_stream_arn,
            maximum_batching_window_in_seconds=1,
            starting_position="LATEST",
            batch_size=1,
        )

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        ddb_table = self._create_ddb_table()
        self._set_ddb_trigger_function(ddb_table)
```

이제 Amazon CloudWatch에 로그를 출력하는 매우 간단한 람다 함수를 만들어 보겠습니다. 이를 위해 `lambda`라는 새 폴더를 만듭니다.

```
mkdir lambda
touch app.py
```

자주 사용하는 텍스트 편집기를 사용하여 다음 내용을 `app.py` 파일에 추가합니다.

```
import logging

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)


def handler(event, context):
    LOGGER.info('Received Event: %s', event)
    for rec in event['Records']:
        LOGGER.info('Record: %s', rec)
```

`/ddb_filters/` 폴더에 있는지 확인하고 다음 명령을 입력하여 샘플 애플리케이션을 생성합니다.

```
cdk deploy
```

어느 시점에서 솔루션 배포 여부를 확인하라는 메시지가 표시됩니다. `Y`를 입력하여 변경 내용을 적용합니다.

```
├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤
│ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │
└───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘

Do you wish to deploy these changes (y/n)? y

...

✨  Deployment time: 67.73s

Outputs:
DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP
Stack ARN:
arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6
```

변경 사항이 배포되면 AWS 콘솔을 열고 테이블에 항목 하나를 추가합니다.

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

이제 CloudWatch 로그에 이 항목의 모든 정보가 포함되어야 합니다.

**필터링 예제**
+ **지정된 주와 일치하는 제품만**

`ddb_filters/ddb_filters/ddb_filters_stack.py` 파일을 열고 "FL"과 동일한 모든 제품과 일치하는 필터를 포함하도록 수정합니다. 이는 45행의 `event_subscription` 바로 아래에서 수정할 수 있습니다.

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```
+ **PK 및 SK의 일부 값으로 시작하는 항목만**

다음 조건을 포함하도록 Python 스크립트를 수정합니다.

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
        ]
    },
```
+ **PK 및 SK의 일부 값으로 시작하거나 특정 상태에서 시작합니다.**

다음 조건을 포함하도록 Python 스크립트를 수정합니다.

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```

필터 배열에 더 많은 요소를 추가하면 OR 조건이 추가됩니다.

**정리**

작업 디렉터리의 베이스에서 필터 스택을 찾고 `cdk destroy`를 실행합니다. 리소스 삭제를 확인하라는 메시지가 표시됩니다.

```
cdk destroy
Are you sure you want to delete: DdbFiltersStack (y/n)? y
```