이 자습서에서는 DynamoDB 테이블의 스트림에서 일부 이벤트만 처리하기 위해 AWS Lambda 트리거를 생성합니다.
Lambda 이벤트 필터링을 사용하면 필터 표현식을 사용하여 Lambda가 어떤 이벤트를 처리를 위해 함수로 보내는지를 제어할 수 있습니다. DynamoDB Streams당 최대 5개의 서로 다른 필터를 구성할 수 있습니다. 일괄 처리 기간을 사용하는 경우 Lambda는 각 새 이벤트에 필터 기준을 적용하여 현재 배치에 포함되어야 하는지 확인합니다.
필터는 FilterCriteria
라는 구조를 통해 적용됩니다. FilterCriteria
의 3가지 주요 속성은 metadata properties
, data properties
및 filter patterns
입니다.
다음은 DynamoDB Streams 이벤트의 예제 구조입니다.
{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }
metadata properties
는 이벤트 객체의 필드입니다. DynamoDB Streams의 경우 metadata properties
은 dynamodb
또는 eventName
과 같은 필드입니다.
data properties
은 이벤트 본문의 필드입니다. data properties
을 필터링하려면 적절한 키 내의 FilterCriteria
에 해당 속성을 포함해야 합니다. DynamoDB 이벤트 소스의 경우 데이터 키는 NewImage
또는 OldImage
입니다.
마지막으로 필터 규칙은 특정 속성에 적용할 필터 표현식을 정의합니다. 여기 몇 가지 예가 있습니다:
비교 연산자 | 예제 | 규칙 구문(부분적) |
---|---|---|
Null |
제품 유형이 null입니다. |
|
비어 있음 |
제품 이름이 비어 있습니다. |
|
같음 |
주가 플로리다와 같습니다. |
|
및 |
제품 주는 플로리다와 같고 제품 범주는 초콜릿과 같습니다. |
|
Or |
제품 주는 플로리다 또는 캘리포니아입니다. |
|
아님 |
제품 주는 플로리다가 아닙니다. |
|
존재함 |
홈메이드 제품이 있습니다. |
|
존재하지 않음 |
홈메이드 제품이 존재하지 않습니다 |
|
다음으로 시작 |
PK는 COMPANY로 시작합니다. |
|
Lambda 함수에 대해 최대 5개의 이벤트 필터링 패턴을 지정할 수 있습니다. 5개의 이벤트 각각은 논리적 OR로 평가됩니다. 따라서 Filter_One
및 Filter_Two
라는 두 개의 필터를 구성하면 Lambda 함수가 Filter_One
또는 Filter_Two
를 실행합니다.
참고
Lambda 이벤트 필터링 페이지에는 숫자 값을 필터링하고 비교하는 몇 가지 옵션이 있지만 DynamoDB 필터 이벤트의 경우 DynamoDB의 숫자가 문자열로 저장되기 때문에 적용되지 않습니다. 예를 들어 "quantity": { "N": "50"
}
의 경우, "N"
속성 때문에 숫자임을 알 수 있습니다.
종합 - AWS CloudFormation
이벤트 필터링 기능을 실제로 보여주기 위해 CloudFormation 템플릿 샘플을 소개합니다. 이 템플릿은 Amazon DynamoDB Streams가 활성화된 파티션 키 PK 및 정렬 키 SK가 있는 간단한 DynamoDB 테이블을 생성합니다. 그러면 Amazon Cloudwatch에 로그를 쓰고 Amazon DynamoDB 스트림에서 이벤트를 읽을 수 있는 람다 함수와 간단한 Lambda 실행 역할이 생성됩니다. 또한 DynamoDB Streams와 Lambda 함수 사이에 이벤트 소스 매핑을 추가하므로 Amazon DynamoDB Streams에 이벤트가 있을 때마다 함수를 실행할 수 있습니다.
AWSTemplateFormatVersion: "2010-09-09" Description: Sample application that presents AWS Lambda event source filtering with Amazon DynamoDB Streams. Resources: StreamsSampleDDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: "PK" AttributeType: "S" - AttributeName: "SK" AttributeType: "S" KeySchema: - AttributeName: "PK" KeyType: "HASH" - AttributeName: "SK" KeyType: "RANGE" StreamSpecification: StreamViewType: "NEW_AND_OLD_IMAGES" ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 LambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: root PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: arn:aws:logs:*:*:* - Effect: Allow Action: - dynamodb:DescribeStream - dynamodb:GetRecords - dynamodb:GetShardIterator - dynamodb:ListStreams Resource: !GetAtt StreamsSampleDDBTable.StreamArn EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST ProcessEventLambda: Type: AWS::Lambda::Function Properties: Runtime: python3.7 Timeout: 300 Handler: index.handler Role: !GetAtt LambdaExecutionRole.Arn Code: ZipFile: | import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec) Outputs: StreamsSampleDDBTable: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.Arn StreamARN: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.StreamArn
이 클라우드 구성 템플릿을 배포한 후 다음과 같은 Amazon DynamoDB 항목을 삽입할 수 있습니다.
{
"PK": "COMPANY#1000",
"SK": "PRODUCT#CHOCOLATE#DARK",
"company_id": "1000",
"type": "",
"state": "FL",
"stores": 5,
"price": 15,
"quantity": 50,
"fabric": "Florida Chocolates"
}
이 클라우드 형성 템플릿에 인라인으로 포함된 간단한 람다 함수 덕분에 다음과 같이 람다 함수에 대한 Amazon CloudWatch 로그 그룹의 이벤트를 볼 수 있습니다.
{
"eventID": "c9fbe7d0261a5163fcb6940593e41797",
"eventName": "INSERT",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "us-east-2",
"dynamodb": {
"ApproximateCreationDateTime": 1664559083.0,
"Keys": {
"SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
"PK": { "S": "COMPANY#1000" }
},
"NewImage": {
"quantity": { "N": "50" },
"company_id": { "S": "1000" },
"fabric": { "S": "Florida Chocolates" },
"price": { "N": "15" },
"stores": { "N": "5" },
"product_id": { "S": "1000" },
"SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
"PK": { "S": "COMPANY#1000" },
"state": { "S": "FL" },
"type": { "S": "" }
},
"SequenceNumber": "700000000000888747038",
"SizeBytes": 174,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
필터링 예제
-
지정된 주와 일치하는 제품만
이 예제에서는 플로리다(약어 'FL')에서 생산되는 모든 제품을 일치시키는 필터를 포함하도록 CloudFormation 템플릿을 수정합니다.
EventSourceDDBTableStream:
Type: AWS::Lambda::EventSourceMapping
Properties:
BatchSize: 1
Enabled: True
FilterCriteria:
Filters:
- Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
FunctionName: !GetAtt ProcessEventLambda.Arn
StartingPosition: LATEST
스택을 재배포하고 나면 다음 DynamoDB 항목을 테이블에 추가할 수 있습니다. 이 예제의 제품은 캘리포니아에서 만들어진 제품이므로 Lambda 함수 로그에는 표시되지 않습니다.
{
"PK": "COMPANY#1000",
"SK": "PRODUCT#CHOCOLATE#DARK#1000",
"company_id": "1000",
"fabric": "Florida Chocolates",
"price": 15,
"product_id": "1000",
"quantity": 50,
"state": "CA",
"stores": 5,
"type": ""
}
-
PK 및 SK의 일부 값으로 시작하는 항목만
이 예에서는 다음 조건을 포함하도록 CloudFormation 템플릿을 수정합니다.
EventSourceDDBTableStream:
Type: AWS::Lambda::EventSourceMapping
Properties:
BatchSize: 1
Enabled: True
FilterCriteria:
Filters:
- Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
FunctionName: !GetAtt ProcessEventLambda.Arn
StartingPosition: LATEST
AND 조건을 사용하려면 패턴 내부에 조건이 있어야 하며 이 경우 PK 및 SK 키가 쉼표로 구분되어 동일한 표현식 안에 있습니다.
PK 및 SK의 일부 값으로 시작하거나 특정 상태에서 시작합니다.
이 예에서는 다음 조건을 포함하도록 CloudFormation 템플릿을 수정합니다.
EventSourceDDBTableStream:
Type: AWS::Lambda::EventSourceMapping
Properties:
BatchSize: 1
Enabled: True
FilterCriteria:
Filters:
- Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
- Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
FunctionName: !GetAtt ProcessEventLambda.Arn
StartingPosition: LATEST
필터 섹션에 새 패턴을 도입하여 OR 조건이 추가되었음을 알 수 있습니다.
종합 - CDK
다음 샘플 CDK 프로젝트 구성 템플릿은 이벤트 필터링 기능을 안내합니다. 이 CDK 프로젝트로 작업하기 전에 준비 스크립트 실행을 포함한 전제 조건을 설치해야 합니다.
CDK 프로젝트 만들기
먼저 빈 디렉터리에서 cdk init
를 호출하여 새 AWS CDK 프로젝트를 생성합니다.
mkdir ddb_filters
cd ddb_filters
cdk init app --language python
cdk init
명령은 프로젝트 폴더의 이름을 사용하여 클래스, 하위 폴더 및 파일을 포함한 프로젝트의 다양한 요소의 이름을 지정합니다. 폴더 이름의 하이픈은 밑줄로 변환됩니다. 그렇지 않으면 이름은 Python 식별자 형식을 따라야 합니다. 예를 들어 숫자로 시작하거나 공백을 포함해서는 안 됩니다.
새 프로젝트를 사용하려면 해당 가상 환경을 활성화하세요. 이렇게 하면 프로젝트의 종속성을 전역적으로 설치하는 대신 프로젝트 폴더에 로컬로 설치할 수 있습니다.
source .venv/bin/activate
python -m pip install -r requirements.txt
참고
이를 Mac/Linux 명령으로 인식하여 가상 환경을 활성화할 수 있습니다. Python 템플릿에는 Windows에서 동일한 명령을 사용할 수 있도록 하는 배치 파일인 source.bat
가 포함되어 있습니다. 기존 Windows 명령 .venv\Scripts\activate.bat
도 작동합니다. AWS CDK Toolkit v1.70.0 이하를 사용하여 AWS CDK 프로젝트를 초기화한 경우 가상 환경은 ..venv
대신 .env
디렉터리에 있습니다.
기본 인프라
선호하는 텍스트 편집기로 ./ddb_filters/ddb_filters_stack.py
파일을 엽니다. 이 파일은 AWS CDK 프로젝트를 생성할 때 자동으로 생성되었습니다.
다음으로 _create_ddb_table
및 _set_ddb_trigger_function
함수를 추가합니다. 이러한 함수는 프로비저닝 모드 온디맨드 모드에서 파티션 키 PK 및 정렬 키 SK가 있는 DynamoDB 테이블을 생성하며 Amazon DynamoDB Streams가 기본적으로 활성화되어 새 이미지와 이전 이미지를 표시합니다.
Lambda 함수는 app.py
파일 아래의 lambda
폴더에 저장됩니다. 이 파일은 나중에 생성됩니다. 여기에는 이 스택에서 생성된 Amazon DynamoDB 테이블의 이름이 될 환경 변수 APP_TABLE_NAME
이 포함됩니다. 동일한 함수에서 Lambda 함수에 스트림 읽기 권한을 부여합니다. 마지막으로 DynamoDB Streams를 람다 함수의 이벤트 소스로 구독합니다.
__init__
메서드의 파일 끝에서 각 구성을 호출하여 스택에서 초기화합니다. 추가 구성 요소와 서비스가 필요한 더 큰 프로젝트의 경우 기본 스택 외부에서 이러한 구성을 정의하는 것이 가장 좋습니다.
import os
import json
import aws_cdk as cdk
from aws_cdk import (
Stack,
aws_lambda as _lambda,
aws_dynamodb as dynamodb,
)
from constructs import Construct
class DdbFiltersStack(Stack):
def _create_ddb_table(self):
dynamodb_table = dynamodb.Table(
self,
"AppTable",
partition_key=dynamodb.Attribute(
name="PK", type=dynamodb.AttributeType.STRING
),
sort_key=dynamodb.Attribute(
name="SK", type=dynamodb.AttributeType.STRING),
billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
removal_policy=cdk.RemovalPolicy.DESTROY,
)
cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name)
return dynamodb_table
def _set_ddb_trigger_function(self, ddb_table):
events_lambda = _lambda.Function(
self,
"LambdaHandler",
runtime=_lambda.Runtime.PYTHON_3_9,
code=_lambda.Code.from_asset("lambda"),
handler="app.handler",
environment={
"APP_TABLE_NAME": ddb_table.table_name,
},
)
ddb_table.grant_stream_read(events_lambda)
event_subscription = _lambda.CfnEventSourceMapping(
scope=self,
id="companyInsertsOnlyEventSourceMapping",
function_name=events_lambda.function_name,
event_source_arn=ddb_table.table_stream_arn,
maximum_batching_window_in_seconds=1,
starting_position="LATEST",
batch_size=1,
)
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
ddb_table = self._create_ddb_table()
self._set_ddb_trigger_function(ddb_table)
이제 Amazon CloudWatch에 로그를 출력하는 매우 간단한 람다 함수를 만들어 보겠습니다. 이를 위해 lambda
라는 새 폴더를 만듭니다.
mkdir lambda
touch app.py
자주 사용하는 텍스트 편집기를 사용하여 다음 내용을 app.py
파일에 추가합니다.
import logging
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)
def handler(event, context):
LOGGER.info('Received Event: %s', event)
for rec in event['Records']:
LOGGER.info('Record: %s', rec)
/ddb_filters/
폴더에 있는지 확인하고 다음 명령을 입력하여 샘플 애플리케이션을 생성합니다.
cdk deploy
어느 시점에서 솔루션 배포 여부를 확인하라는 메시지가 표시됩니다. Y
를 입력하여 변경 내용을 적용합니다.
├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤
│ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │
└───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘
Do you wish to deploy these changes (y/n)? y
...
✨ Deployment time: 67.73s
Outputs:
DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP
Stack ARN:
arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6
변경 사항이 배포되면 AWS 콘솔을 열고 테이블에 항목 하나를 추가합니다.
{
"PK": "COMPANY#1000",
"SK": "PRODUCT#CHOCOLATE#DARK",
"company_id": "1000",
"type": "",
"state": "FL",
"stores": 5,
"price": 15,
"quantity": 50,
"fabric": "Florida Chocolates"
}
이제 CloudWatch 로그에 이 항목의 모든 정보가 포함되어야 합니다.
필터링 예제
-
지정된 주와 일치하는 제품만
ddb_filters/ddb_filters/ddb_filters_stack.py
파일을 열고 "FL"과 동일한 모든 제품과 일치하는 필터를 포함하도록 수정합니다. 이는 45행의 event_subscription
바로 아래에서 수정할 수 있습니다.
event_subscription.add_property_override(
property_path="FilterCriteria",
value={
"Filters": [
{
"Pattern": json.dumps(
{"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
)
},
]
},
)
-
PK 및 SK의 일부 값으로 시작하는 항목만
다음 조건을 포함하도록 Python 스크립트를 수정합니다.
event_subscription.add_property_override(
property_path="FilterCriteria",
value={
"Filters": [
{
"Pattern": json.dumps(
{
{
"dynamodb": {
"Keys": {
"PK": {"S": [{"prefix": "COMPANY"}]},
"SK": {"S": [{"prefix": "PRODUCT"}]},
}
}
}
}
)
},
]
},
-
PK 및 SK의 일부 값으로 시작하거나 특정 상태에서 시작합니다.
다음 조건을 포함하도록 Python 스크립트를 수정합니다.
event_subscription.add_property_override(
property_path="FilterCriteria",
value={
"Filters": [
{
"Pattern": json.dumps(
{
{
"dynamodb": {
"Keys": {
"PK": {"S": [{"prefix": "COMPANY"}]},
"SK": {"S": [{"prefix": "PRODUCT"}]},
}
}
}
}
)
},
{
"Pattern": json.dumps(
{"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
)
},
]
},
)
필터 배열에 더 많은 요소를 추가하면 OR 조건이 추가됩니다.
정리
작업 디렉터리의 베이스에서 필터 스택을 찾고 cdk destroy
를 실행합니다. 리소스 삭제를 확인하라는 메시지가 표시됩니다.
cdk destroy
Are you sure you want to delete: DdbFiltersStack (y/n)? y