

# Amazon DynamoDB에서 AWS Lambda 사용
<a name="with-ddb"></a>

**참고**  
Lambda 함수 이외의 대상으로 데이터를 전송하거나 데이터를 전송하기 전에 데이터를 보강하려는 경우 [Amazon EventBridge 파이프](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html)를 참조하세요.

AWS Lambda 함수를 사용하여 [Amazon DynamoDB 데이터 스트림](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html)의 레코드를 처리할 수 있습니다. DynamoDB Streams를 사용하여 DynamoDB 테이블이 업데이트될 때마다 추가 작업을 수행하는 Lambda 함수를 트리거할 수 있습니다.

DynamoDB 스트림을 처리할 때, 배치의 일부 레코드가 실패하는 경우 처리 완료된 레코드가 재시도되지 않도록 부분 배치 응답 로직을 구현해야 합니다. Powertools for AWS Lambda의 [배치 프로세서 유틸리티](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)는 Python, TypeScript, .NET 및 Java에서 사용할 수 있으며, 부분 배치 응답 로직을 자동으로 처리하여 개발 시간을 단축하고 신뢰성을 개선하여 해당 구현을 간소화합니다.

**Topics**
+ [폴링 및 배치 처리 스트림](#dynamodb-polling-and-batching)
+ [폴링 및 스트리밍 시작 위치](#dyanmo-db-stream-poll)
+ [DynamoDB Streams 내 샤드의 동시 리더](#events-dynamodb-simultaneous-readers)
+ [예제 이벤트](#events-sample-dynamodb)
+ [Lambda로 DynamoDB 레코드 처리](services-dynamodb-eventsourcemapping.md)
+ [DynamoDB 및 Lambda로 부분 배치 응답 구성](services-ddb-batchfailurereporting.md)
+ [Lambda에서 DynamoDB 이벤트 소스에 대한 폐기된 레코드 유지](services-dynamodb-errors.md)
+ [Lambda에서 상태 저장 DynamoDB 스트림 처리 구현](services-ddb-windows.md)
+ [Amazon DynamoDB 이벤트 소스 매핑을 위한 Lambda 파라미터](services-ddb-params.md)
+ [DynamoDB 이벤트 소스를 통해 이벤트 필터링 사용](with-ddb-filtering.md)
+ [자습서: Amazon DynamoDB Streams와 함께 AWS Lambda 사용](with-ddb-example.md)

## 폴링 및 배치 처리 스트림
<a name="dynamodb-polling-and-batching"></a>

Lambda는 초당 4회의 기본 속도로 레코드에 대해 DynamoDB 스트림의 샤드를 폴링합니다. 레코드를 사용할 수 있으면 Lambda가 함수를 간접 호출하고 결과를 기다립니다. 처리가 성공하면 Lambda가 레코드를 더 받을 때까지 폴링을 재개합니다.

기본적으로, Lambda는 레코드가 사용 가능하게 되는 즉시 함수를 간접 호출합니다. Lambda가 이벤트 소스에서 읽는 배치에 하나의 레코드만 있는 경우, Lambda는 함수에 하나의 레코드만 전송합니다. 소수의 레코드로 함수를 호출하는 것을 피하려면 *일괄 처리 기간*을 구성하여 이벤트 소스가 최대 5분 동안 레코드를 버퍼링하도록 지정할 수 있습니다. 함수를 호출하기 전에 Lambda는 전체 배치가 수집되거나, 일괄 처리 기간이 만료되거나, 배치가 페이로드 한도인 6MB에 도달할 때까지 이벤트 소스에서 레코드를 계속 읽습니다. 자세한 내용은 [일괄 처리 동작](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching) 섹션을 참조하세요.

**주의**  
Lambda 이벤트 소스 매핑은 각 이벤트를 한 번 이상 처리하므로 레코드가 중복될 수 있습니다. 중복 이벤트와 관련된 잠재적 문제를 방지하려면 함수 코드를 멱등성으로 만드는 것이 좋습니다. 자세한 내용은 AWS 지식 센터의 [함수를 멱등성 Lambda 함수로 만들려면 어떻게 해야 하나요?](https://repost.aws/knowledge-center/lambda-function-idempotent)를 참조하세요.

Lambda는 처리를 위해 다음 배치를 전송하기 전에 구성된 [확장](lambda-extensions.md)이 완료될 때까지 기다리지 않습니다. 즉, Lambda가 다음 레코드 배치를 처리할 때 확장이 계속 실행될 수 있습니다. 이로 인해 계정의 [동시성](lambda-concurrency.md) 설정 또는 스로틀링을 위반하는 경우 제한 문제가 발생할 수 있습니다. 이것이 잠재적인 문제인지 여부를 탐지하려면 함수를 모니터링하고 이벤트 소스 매핑에 대해 예상보다 높은 [동시성 지표](monitoring-concurrency.md#general-concurrency-metrics)가 표시되는지 확인하세요. 간접 호출 간격이 짧기 때문에 Lambda는 일시적으로 동시성 사용량을 샤드 수보다 더 높게 보고할 수 있습니다. 확장이 없는 Lambda 함수에서도 마찬가지입니다.

DynamoDB 데이터 스트림의 한 샤드와 하나 이상의 Lambda 간접 호출을 동시에 처리하도록 [ParallelizationFactor](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-ParallelizationFactor) 설정을 구성합니다. Lambda가 병렬화 계수를 통해 샤드에서 폴링하는 동시 배치의 수는 1(기본값)부터 10까지 지정할 수 있습니다. 예를 들어 `ParallelizationFactor`를 2로 설정하는 경우, 최대 100개의 DynamoDB 스트림 샤드를 처리하기 위한 200번의 동시 Lambda 간접 호출을 보유할 수 있습니다(실제 `ConcurrentExecutions` 지표의 값은 다를 수 있음). 이는 데이터 볼륨이 일시적이고 [IteratorAge](monitoring-metrics-types.md#performance-metrics)가 높은 경우 처리량을 스케일 업하는 데 도움을 줍니다. 샤드당 동시 배치 수를 늘려도 Lambda는 항목(파티션 및 정렬 키) 수준에서 순차적인 처리를 계속 보장합니다.

## 폴링 및 스트리밍 시작 위치
<a name="dyanmo-db-stream-poll"></a>

이벤트 소스 매핑 생성 및 업데이트 중 스트림 폴링은 최종적으로 일관됩니다.
+ 이벤트 소스 매핑 생성 중 스트림에서 이벤트 폴링을 시작하는 데 몇 분 정도 걸릴 수 있습니다.
+ 이벤트 소스 매핑 업데이트 중 스트림에서 이벤트 폴링을 중지했다가 다시 시작하는 데 몇 분 정도 걸릴 수 있습니다.

이 동작은 스트림의 시작 위치로 `LATEST`를 지정하면 이벤트 소스 매핑이 생성 또는 업데이트 중에 이벤트를 놓칠 수 있음을 의미합니다. 누락된 이벤트가 없도록 스트림 시작 위치를 `TRIM_HORIZON`으로 지정하세요.

## DynamoDB Streams 내 샤드의 동시 리더
<a name="events-dynamodb-simultaneous-readers"></a>

글로벌 테이블이 아닌 단일 리전 테이블의 경우 최대 2개의 Lambda 함수가 동시에 동일 DynamoDB Streams 샤드에서 읽기 작업을 수행하도록 설계할 수 있습니다. 이 제한을 초과하면 요청 병목이 발생할 수 있습니다. 글로벌 테이블의 경우 요청 제한을 피하기 위해 동시 함수 수를 1로 제한하는 것이 좋습니다.

## 예제 이벤트
<a name="events-sample-dynamodb"></a>

**Example**  

```
{
  "Records": [
    {
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "NewImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES",
        "SequenceNumber": "111",
        "SizeBytes": 26
      },
      "awsRegion": "us-west-2",
      "eventName": "INSERT",
      "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525",
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "2",
      "eventVersion": "1.0",
      "dynamodb": {
        "OldImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "SequenceNumber": "222",
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "SizeBytes": 59,
        "NewImage": {
          "Message": {
            "S": "This item has changed"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "awsRegion": "us-west-2",
      "eventName": "MODIFY",
      "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525",
      "eventSource": "aws:dynamodb"
    }
  ]}
```

# Lambda로 DynamoDB 레코드 처리
<a name="services-dynamodb-eventsourcemapping"></a>

이벤트 소스 매핑을 생성하여 Lambda가 스트림의 레코드를 Lambda 함수로 전송하도록 지시합니다. 여러 이벤트 소스 매핑을 생성하여 여러 Lambda 함수로 동일한 데이터를 처리하거나, 단일 함수로 여러 스트림의 항목을 처리할 수 있습니다.

다른 AWS 계정에서 스트림의 레코드를 처리하도록 이벤트 소스 매핑을 구성할 수 있습니다. 자세한 내용은 [계정 간 이벤트 소스 매핑 생성](#services-dynamodb-eventsourcemapping-cross-account)를 참조하세요.

DynamoDB 스트림에서 읽도록 함수를 구성하려면 [AWSLambdaDynamoDBExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaDynamoDBExecutionRole.html) AWS 관리형 정책을 실행 역할에 연결한 다음 **DynamoDB** 트리거를 생성합니다.

**권한 추가 및 트리거 생성**

1. Lambda 콘솔의 [함수 페이지](https://console.aws.amazon.com/lambda/home#/functions)를 엽니다.

1. 함수의 이름을 선택합니다.

1. **구성(Configuration)** 탭을 선택한 다음, **권한(Permissions)**을 선택합니다.

1. **역할 이름**에서 실행 역할에 대한 링크를 선택합니다. 이 링크를 클릭하면 IAM 콘솔에서 역할이 열립니다.  
![\[\]](http://docs.aws.amazon.com/ko_kr/lambda/latest/dg/images/execution-role.png)

1. **권한 추가**를 선택하고 **정책 연결**을 선택합니다.  
![\[\]](http://docs.aws.amazon.com/ko_kr/lambda/latest/dg/images/attach-policies.png)

1. 검색 필드에 `AWSLambdaDynamoDBExecutionRole`를 입력합니다. 실행 역할에 이 정책을 추가합니다. 함수가 DynamoDB 스트림에서 읽는 데 필요한 권한을 포함하는 AWS관리형 정책입니다. 이 정책에 대한 자세한 내용은 *AWS 관리형 정책 참조*의 [AWSLambdaDynamoDBExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaDynamoDBExecutionRole.html)을 참조하세요.

1. Lambda 콘솔에서 함수로 돌아갑니다. **함수 개요(Function overview)**에서 **트리거 추가(Add trigger)**를 선택합니다.  
![\[\]](http://docs.aws.amazon.com/ko_kr/lambda/latest/dg/images/add-trigger.png)

1. 트리거 유형을 선택합니다.

1. 필요한 옵션을 구성한 다음 **추가**를 선택합니다.

Lambda는 DynamoDB 이벤트 소스에 대해 다음 옵션을 지원합니다.

**이벤트 소스 옵션**
+ **DynamoDB 테이블** – 레코드를 읽을 DynamoDB 테이블입니다.
+ **배치 크기(Batch size)** - 각 배치에서 함수에 보낼 레코드 수입니다(최대 10,000개). Lambda는 한 번의 호출로 배치의 모든 레코드를 함수에 전달합니다. 단, 이벤트의 총 크기가 동기식 호출에 대한 [페이로드 한도](gettingstarted-limits.md)(6MB)를 초과하지 않아야 합니다.
+ **배치 기간(Batch window)** - 함수를 호출하기 전에 레코드를 수집할 최대 기간(단위: 초)를 지정합니다.
+ **시작 위치** – 새 레코드만, 또는 기존의 모든 레코드를 처리합니다.
  + **최신** – 스트림에 추가된 새 레코드를 처리합니다.
  + **수평 트리밍** – 스트림의 모든 레코드를 처리합니다.

  기존 레코드 처리 후 함수는 캐치업되고 새 레코드를 계속 처리합니다.
+ **On-failure destination**(실패 시 대상) – 처리할 수 없는 레코드에 대한 표준 SQS 대기열 또는 표준 SNS 주제입니다. 너무 오래되었거나 모든 재시도를 다 사용한 레코드 배치를 폐기할 때, Lambda는 해당 배치에 대한 세부 정보를 대기열 또는 주제로 보냅니다.
+ **Retry attempts(재시도)** - 함수가 오류를 반환할 때 Lambda에서 재시도하는 최대 횟수입니다. 이는 배치가 함수에 도달하지 않은 제한 또는 서비스 오류에 적용되지 않습니다.
+ **Maximum age of record(최대 레코드 사용 기간)** – Lambda에서 함수로 보내는 최대 레코드 사용 기간입니다.
+ **Split batch on error(오류 시 배치 분할)** – 함수에서 오류를 반환하면 재시도하기 전에 배치를 두 개로 분할합니다. 원래 배치 크기 설정은 변경되지 않습니다.
+ **Concurrent batches per shard(샤드당 동시 배치)** – 동일한 샤드의 여러 배치를 동시에 처리합니다.
+ **활성화** – 이벤트 소스 매핑을 활성화하려면 true로 설정합니다. 레코드 처리를 중지하려면 false로 설정합니다. Lambda는 마지막으로 처리된 레코드를 추적하여 매핑이 다시 활성화되면 해당 지점부터 처리를 다시 시작합니다.

**참고**  
DynamoDB 트리거의 일부로 Lambda에서 간접 호출한 GetRecords API 간접 호출에 대해서는 요금이 부과되지 않습니다.

나중에 이벤트 소스 구성을 관리하기 위해 디자이너에서 트리거를 선택합니다.

## 계정 간 이벤트 소스 매핑 생성
<a name="services-dynamodb-eventsourcemapping-cross-account"></a>

Amazon DynamoDB는 이제 [리소스 기반 정책](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/access-control-resource-based.html)을 지원합니다. 이 기능을 사용하면 한 AWS 계정에서 다른 계정의 Lambda 함수를 사용하여 DynamoDB 스트림의 데이터를 처리할 수 있습니다.

다른 AWS 계정에서 DynamoDB 스트림을 사용하여 Lambda 함수에 대한 이벤트 소스 매핑을 생성하려면 리소스 기반 정책을 사용하여 스트림을 구성해서 Lambda 함수에 레코드 읽기 권한을 부여합니다. 교차 계정 액세스를 허용하도록 스트림을 구성하는 방법을 자세히 알아보려면 *Amazon DynamoDB 개발자 안내서*의 [교차 계정 Lambda 함수를 통한 액세스 공유](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/rbac-cross-account-access.html#rbac-analyze-cross-account-lambda-access)를 참조하세요.

스트림에서 Lambda 함수에 필요한 권한을 부여하는 리소스 기반 정책을 구성한 후에는 교차 계정 스트림 ARN을 사용하여 이벤트 소스 매핑을 생성합니다. 교차 계정 DynamoDB 콘솔의 테이블 **내보내기 및 스트림** 탭에서 스트림 ARN을 찾을 수 있습니다.

Lambda 콘솔을 사용하는 경우 이벤트 소스 매핑 생성 페이지의 DynamoDB 테이블 입력 필드에 스트림 ARN을 직접 붙여넣습니다.

 **참고:** 교차 리전 트리거는 지원되지 않습니다.

# DynamoDB 및 Lambda로 부분 배치 응답 구성
<a name="services-ddb-batchfailurereporting"></a>

이벤트 소스에서 스트리밍 데이터를 사용하고 처리할 때 기본적으로 Lambda는 배치가 완전히 성공한 경우에만 배치의 가장 높은 시퀀스 번호로 체크포인트를 수행합니다. Lambda는 다른 모든 결과를 완전한 실패로 처리하고 재시도 제한까지 배치 처리를 재시도합니다. 스트림에서 배치를 처리하는 동안 부분적인 성공을 허용하려면 `ReportBatchItemFailures`를 설정합니다. 부분적인 성공을 허용하면 레코드에 대한 재시도 횟수를 줄이는 데 도움이 되지만 성공한 레코드의 재시도 가능성을 완전히 막지는 못합니다.

`ReportBatchItemFailures`를 켜려면 [FunctionResponseTypes](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionResponseTypes) 목록에 열거형 값 **ReportBatchItemFailures**를 포함시킵니다. 이 목록은 함수에 대해 활성화된 응답 유형을 나타냅니다. 이벤트 소스 매핑을 [생성](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html)하거나 [업데이트](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html)할 때 이 목록을 구성할 수 있습니다.

**참고**  
함수 코드가 부분 배치 실패 응답을 반환하더라도 이벤트 소스 매핑에 대해 `ReportBatchItemFailures` 기능이 명시적으로 활성화되지 않으면 Lambda가 해당 응답을 처리하지 않습니다.

## 보고서 구문
<a name="streams-batchfailurereporting-syntax"></a>

배치 항목 실패에 대한 보고를 구성할 때 `StreamsEventResponse` 클래스는 배치 항목 실패 목록과 함께 반환됩니다. `StreamsEventResponse` 객체를 사용하여 배치에서 첫 번째 실패한 레코드의 시퀀스 번호를 반환할 수 있습니다. 올바른 응답 구문을 사용하여 고유한 사용자 지정 클래스를 생성할 수도 있습니다. 다음 JSON 구조는 필요한 응답 구문을 보여줍니다.

```
{ 
  "batchItemFailures": [ 
        {
            "itemIdentifier": "<SequenceNumber>"
        }
    ]
}
```

**참고**  
`batchItemFailures` 어레이에 여러 항목이 포함되어 있으면 Lambda는 시퀀스 번호가 가장 낮은 레코드를 체크포인트로 사용합니다. 그런 다음 Lambda는 해당 체크포인트에서 시작하여 모든 레코드를 다시 시도합니다.

## 성공 및 실패 조건
<a name="streams-batchfailurereporting-conditions"></a>

Lambda는 다음 중 하나를 반환할 경우 배치를 완전한 성공으로 처리합니다.
+ 비어 있는 `batchItemFailure` 목록
+ null `batchItemFailure` 목록
+ 비어 있는 `EventResponse`
+ null `EventResponse`

Lambda는 다음 중 하나를 반환할 경우 배치를 완전한 실패로 처리합니다.
+ 빈 문자열 `itemIdentifier`
+ null `itemIdentifier`
+ 키 이름이 잘못된 `itemIdentifier`

Lambda는 재시도 전략에 따라 실패를 재시도합니다.

## 배치 이등분
<a name="streams-batchfailurereporting-bisect"></a>

호출이 실패하고 `BisectBatchOnFunctionError`가 활성화되어 있으면 `ReportBatchItemFailures` 설정에 관계 없이 배치가 이등분됩니다.

부분적 배치 성공 응답이 수신되고 `BisectBatchOnFunctionError` 및 `ReportBatchItemFailures`가 모두 활성화되면 배치가 반환된 시퀀스 번호에서 이등분되고 Lambda는 나머지 레코드만 재시도합니다.

부분 배치 응답 로직 구현을 간소화하려면 이러한 복잡성을 자동 처리할 수 있는 Powertools for AWS Lambda의 [배치 프로세서 유틸리티](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)를 사용하는 것이 좋습니다.

다음은 일괄적으로 실패한 메시지 ID 목록을 반환하는 함수 코드의 몇 가지 예입니다.

------
#### [ .NET ]

**SDK for .NET**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
.NET을 사용하여 Lambda로 DynamoDB 배치 항목 실패 보고.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace AWSLambda_DDB;

public class Function
{
    public StreamsEventResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)

    {
        context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");
        List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new List<StreamsEventResponse.BatchItemFailure>();
        StreamsEventResponse streamsEventResponse = new StreamsEventResponse();

        foreach (var record in dynamoEvent.Records)
        {
            try
            {
                var sequenceNumber = record.Dynamodb.SequenceNumber;
                context.Logger.LogInformation(sequenceNumber);
            }
            catch (Exception ex)
            {
                context.Logger.LogError(ex.Message);
                batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = record.Dynamodb.SequenceNumber });
            }
        }

        if (batchItemFailures.Count > 0)
        {
            streamsEventResponse.BatchItemFailures = batchItemFailures;
        }

        context.Logger.LogInformation("Stream processing complete.");
        return streamsEventResponse;
    }
}
```

------
#### [ Go ]

**SDK for Go V2**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Go를 사용하여 Lambda로 DynamoDB 배치 항목 실패 보고.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

type BatchItemFailure struct {
	ItemIdentifier string `json:"ItemIdentifier"`
}

type BatchResult struct {
	BatchItemFailures []BatchItemFailure `json:"BatchItemFailures"`
}

func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*BatchResult, error) {
	var batchItemFailures []BatchItemFailure
	curRecordSequenceNumber := ""

	for _, record := range event.Records {
		// Process your record
		curRecordSequenceNumber = record.Change.SequenceNumber
	}

	if curRecordSequenceNumber != "" {
		batchItemFailures = append(batchItemFailures, BatchItemFailure{ItemIdentifier: curRecordSequenceNumber})
	}
	
	batchResult := BatchResult{
		BatchItemFailures: batchItemFailures,
	}

	return &batchResult, nil
}

func main() {
	lambda.Start(HandleRequest)
}
```

------
#### [ Java ]

**SDK for Java 2.x**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Java를 사용하여 Lambda로 DynamoDB 배치 항목 실패 보고.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord;

import java.util.ArrayList;
import java.util.List;

public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, StreamsEventResponse> {

    @Override
    public StreamsEventResponse handleRequest(DynamodbEvent input, Context context) {

        List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
        String curRecordSequenceNumber = "";

        for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) {
          try {
                //Process your record
                StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb();
                curRecordSequenceNumber = dynamodbRecord.getSequenceNumber();
                
            } catch (Exception e) {
                /* Since we are working with streams, we can return the failed item immediately.
                   Lambda will immediately begin to retry processing from this failed item onwards. */
                batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
                return new StreamsEventResponse(batchItemFailures);
            }
        }
       
       return new StreamsEventResponse();   
    }
}
```

------
#### [ JavaScript ]

**SDK for JavaScript (v3)**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
JavaScript를 사용하여 Lambda로 DynamoDB 배치 항목 실패 보고.  

```
export const handler = async (event) => {
  const records = event.Records;
  let curRecordSequenceNumber = "";

  for (const record of records) {
    try {
      // Process your record
      curRecordSequenceNumber = record.dynamodb.SequenceNumber;
    } catch (e) {
      // Return failed record's sequence number
      return { batchItemFailures: [{ itemIdentifier: curRecordSequenceNumber }] };
    }
  }

  return { batchItemFailures: [] };
};
```
TypeScript를 사용하여 Lambda로 DynamoDB 배치 항목 실패 보고.  

```
import {
  DynamoDBBatchResponse,
  DynamoDBBatchItemFailure,
  DynamoDBStreamEvent,
} from "aws-lambda";

export const handler = async (
  event: DynamoDBStreamEvent
): Promise<DynamoDBBatchResponse> => {
  const batchItemFailures: DynamoDBBatchItemFailure[] = [];
  let curRecordSequenceNumber;

  for (const record of event.Records) {
    curRecordSequenceNumber = record.dynamodb?.SequenceNumber;

    if (curRecordSequenceNumber) {
      batchItemFailures.push({
        itemIdentifier: curRecordSequenceNumber,
      });
    }
  }

  return { batchItemFailures: batchItemFailures };
};
```

------
#### [ PHP ]

**SDK for PHP**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
PHP를 사용하여 Lambda로 DynamoDB 배치 항목 실패 보고.  

```
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler implements StdHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handle(mixed $event, Context $context): array
    {
        $dynamoDbEvent = new DynamoDbEvent($event);
        $this->logger->info("Processing records");

        $records = $dynamoDbEvent->getRecords();
        $failedRecords = [];
        foreach ($records as $record) {
            try {
                $data = $record->getData();
                $this->logger->info(json_encode($data));
                // TODO: Do interesting work based on the new data
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
                // failed processing the record
                $failedRecords[] = $record->getSequenceNumber();
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");

        // change format for the response
        $failures = array_map(
            fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber],
            $failedRecords
        );

        return [
            'batchItemFailures' => $failures
        ];
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK for Python(Boto3)**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Python을 사용하여 Lambda로 DynamoDB 배치 항목 실패 보고.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def handler(event, context):
    records = event.get("Records")
    curRecordSequenceNumber = ""
    
    for record in records:
        try:
            # Process your record
            curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"]
        except Exception as e:
            # Return failed record's sequence number
            return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]}

    return {"batchItemFailures":[]}
```

------
#### [ Ruby ]

**SDK for Ruby**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Ruby를 사용하여 Lambda로 DynamoDB 배치 항목 실패 보고.  

```
def lambda_handler(event:, context:)
    records = event["Records"]
    cur_record_sequence_number = ""
  
    records.each do |record|
      begin
        # Process your record
        cur_record_sequence_number = record["dynamodb"]["SequenceNumber"]
      rescue StandardError => e
        # Return failed record's sequence number
        return {"batchItemFailures" => [{"itemIdentifier" => cur_record_sequence_number}]}
      end
    end
  
    {"batchItemFailures" => []}
  end
```

------
#### [ Rust ]

**SDK for Rust**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Rust를 사용하여 Lambda로 DynamoDB 배치 항목 실패 보고.  

```
use aws_lambda_events::{
    event::dynamodb::{Event, EventRecord, StreamRecord},
    streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

/// Process the stream record
fn process_record(record: &EventRecord) -> Result<(), Error> {
    let stream_record: &StreamRecord = &record.change;

    // process your stream record here...
    tracing::info!("Data: {:?}", stream_record);

    Ok(())
}

/// Main Lambda handler here...
async fn function_handler(event: LambdaEvent<Event>) -> Result<DynamoDbEventResponse, Error> {
    let mut response = DynamoDbEventResponse {
        batch_item_failures: vec![],
    };

    let records = &event.payload.records;

    if records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(response);
    }

    for record in records {
        tracing::info!("EventId: {}", record.event_id);

        // Couldn't find a sequence number
        if record.change.sequence_number.is_none() {
            response.batch_item_failures.push(DynamoDbBatchItemFailure {
                item_identifier: Some("".to_string()),
            });
            return Ok(response);
        }

        // Process your record here...
        if process_record(record).is_err() {
            response.batch_item_failures.push(DynamoDbBatchItemFailure {
                item_identifier: record.change.sequence_number.clone(),
            });
            /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
            return Ok(response);
        }
    }

    tracing::info!("Successfully processed {} record(s)", records.len());

    Ok(response)
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

## Powertools for AWS Lambda 배치 프로세서 사용
<a name="services-ddb-batchfailurereporting-powertools"></a>

Powertools for AWS Lambda의 배치 프로세서 유틸리티는 부분 배치 응답 로직을 자동으로 처리하여 배치 실패 보고 구현에 따르는 복잡성을 줄입니다. 다음은 배치 프로세서를 사용하는 예제입니다.

**Python**  
전체 예제 및 설정 지침은 [배치 프로세서 설명서를](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/) 참조하세요.
AWS Lambda 배치 프로세서를 사용하여 DynamoDB 스트림 레코드를 처리합니다.  

```
import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes import DynamoDBStreamEvent
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
logger = Logger()

def record_handler(record):
    logger.info(record)
    # Your business logic here
    # Raise an exception to mark this record as failed
    
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(
        event=event, 
        record_handler=record_handler, 
        processor=processor,
        context=context
    )
```

**TypeScript**  
전체 예제 및 설정 지침은 [배치 프로세서 설명서를](https://docs.aws.amazon.com/powertools/typescript/latest/features/batch/) 참조하세요.
AWS Lambda 배치 프로세서를 사용하여 DynamoDB 스트림 레코드를 처리합니다.  

```
import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { DynamoDBStreamEvent, Context } from 'aws-lambda';

const processor = new BatchProcessor(EventType.DynamoDBStreams);
const logger = new Logger();

const recordHandler = async (record: any): Promise<void> => {
    logger.info('Processing record', { record });
    // Your business logic here
    // Throw an error to mark this record as failed
};

export const handler = async (event: DynamoDBStreamEvent, context: Context) => {
    return processPartialResponse(event, recordHandler, processor, {
        context,
    });
};
```

**Java**  
전체 예제 및 설정 지침은 [배치 프로세서 설명서를](https://docs.powertools.aws.dev/lambda/java/latest/utilities/batch/) 참조하세요.
AWS Lambda 배치 프로세서를 사용하여 DynamoDB 스트림 레코드를 처리합니다.  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;

public class DynamoDBStreamBatchHandler implements RequestHandler<DynamodbEvent, StreamsEventResponse> {

    private final BatchMessageHandler<DynamodbEvent, StreamsEventResponse> handler;

    public DynamoDBStreamBatchHandler() {
        handler = new BatchMessageHandlerBuilder()
                .withDynamoDbBatchHandler()
                .buildWithRawMessageHandler(this::processMessage);
    }

    @Override
    public StreamsEventResponse handleRequest(DynamodbEvent ddbEvent, Context context) {
        return handler.processBatch(ddbEvent, context);
    }

    private void processMessage(DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord, Context context) {
        // Process the change record
    }
}
```

**.NET**  
전체 예제 및 설정 지침은 [배치 프로세서 설명서를](https://docs.aws.amazon.com/powertools/dotnet/utilities/batch-processing/) 참조하세요.
AWS Lambda 배치 프로세서를 사용하여 DynamoDB 스트림 레코드를 처리합니다.  

```
using System;
using System.Threading;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
using Amazon.Lambda.Serialization.SystemTextJson;
using AWS.Lambda.Powertools.BatchProcessing;

[assembly: LambdaSerializer(typeof(DefaultLambdaJsonSerializer))]

namespace HelloWorld;

public class Customer
{
    public string? CustomerId { get; set; }
    public string? Name { get; set; }
    public string? Email { get; set; }
    public DateTime CreatedAt { get; set; }
}

internal class TypedDynamoDbRecordHandler : ITypedRecordHandler<Customer> 
{
    public async Task<RecordHandlerResult> HandleAsync(Customer customer, CancellationToken cancellationToken)
    {
        if (string.IsNullOrEmpty(customer.Email)) 
        {
            throw new ArgumentException("Customer email is required");
        }

        return await Task.FromResult(RecordHandlerResult.None); 
    }
}

public class Function
{
    [BatchProcessor(TypedRecordHandler = typeof(TypedDynamoDbRecordHandler))]
    public BatchItemFailuresResponse HandlerUsingTypedAttribute(DynamoDBEvent _)
    {
        return TypedDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; 
    }
}
```

# Lambda에서 DynamoDB 이벤트 소스에 대한 폐기된 레코드 유지
<a name="services-dynamodb-errors"></a>

DynamoDB 이벤트 소스 매핑에 대한 오류 처리는 오류가 함수 간접 호출 전에 발생하는지 아니면 함수 간접 호출 중에 발생하는지 여부에 따라 달라집니다.
+ **간접 호출 전:** Lambda 이벤트 소스 매핑이 스로틀링 또는 기타 문제로 인해 함수를 간접적으로 간접 호출할 수 없는 경우 레코드가 만료되거나 이벤트 소스 매핑에 구성된 최대 기간([MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds))을 초과할 때까지 재시도합니다.
+ **간접 호출 중:** 함수가 간접적으로 간접 호출되었지만 오류가 반환되는 경우 Lambda는 레코드가 만료되거나 최대 기간([MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds))을 초과하거나 구성된 재시도 할당량([MaximumRetryAttempts](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRetryAttempts))에 도달할 때까지 재시도합니다. 함수 오류의 경우 실패한 배치를 두 개의 작은 배치로 분할하여 잘못된 레코드를 격리하고 시간 초과를 방지하는 [BisectBatchOnFunctionError](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-response-BisectBatchOnFunctionError)를 구성할 수도 있습니다. 배치를 분할할 때는 재시도 할당량이 소모되지 않습니다.

오류 처리에서 실패를 측정하는 경우 Lambda는 레코드를 폐기하고 스트림에서 배치 처리를 계속합니다. 기본 설정을 사용하는 경우 이는 잘못된 레코드가 영향을 받은 샤드에 대한 처리를 최대 1일 동안 차단할 수 있음을 의미합니다. 이를 방지하려면 함수의 이벤트 소스 매핑을 사용자의 사례에 적합한 최대 레코드 사용 기간 및 합당한 재시도 횟수로 구성합니다.

## 실패한 간접 호출에 대한 대상 구성
<a name="dynamodb-on-failure-destination-console"></a>

실패한 이벤트 소스 매핑 간접 호출 기록을 보관하려면 함수의 이벤트 소스 매핑에 대상을 추가합니다. 대상으로 전송된 각 레코드는 실패한 간접 호출에 대한 메타데이터를 포함하는 JSON 문서입니다. Amazon S3 대상의 경우 Lambda는 메타데이터와 함께 전체 간접 호출 레코드를 전송합니다. 모든 Amazon SNS 주제, Amazon SQS 대기열, Amazon S3 버킷 또는 Kafka를 대상으로 구성할 수 있습니다.

Amazon S3 대상을 사용하면 [Amazon S3 이벤트 알림](https://docs.aws.amazon.com/) 기능을 통해 객체를 대상 S3 버킷에 업로드할 때 알림을 받을 수 있습니다. 실패한 배치에서 자동 처리를 수행하기 위해 다른 Lambda 함수를 간접 호출하도록 S3 이벤트 알림을 구성할 수도 있습니다.

실행 역할에 대상에 대한 권한이 있어야 합니다.
+ **SQS 대상:** [sqs:SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)
+ **SNS 대상:** [sns:Publish](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html)
+ **S3 대상:** [s3:PutObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html) 및 [s3:ListBucket](https://docs.aws.amazon.com/AmazonS3/latest/API/ListObjectsV2.html)
+ **Kafka 대상:** [kafka-cluster:WriteData](https://docs.aws.amazon.com/msk/latest/developerguide/kafka-actions.html)

Kafka 주제를 Kafka 이벤트 소스 매핑의 실패 시 대상으로 구성할 수 있습니다. Lambda가 재시도 횟수를 소진한 후 레코드를 처리할 수 없거나 레코드가 최대 수명을 초과하는 경우 Lambda는 나중에 처리할 수 있도록 실패한 레코드를 지정된 Kafka 주제로 보냅니다. 자세한 내용은 [Kafka 주제를 실패 시 대상으로 사용](kafka-on-failure-destination.md) 항목을 참조하세요.

S3 대상에 대해 자체 KMS 키를 사용하여 암호화를 활성화한 경우 함수의 실행 역할에 [kms:GenerateDataKey](https://docs.aws.amazon.com/kms/latest/APIReference/API_GenerateDataKey.html)를 직접 호출할 수 있는 권한도 있어야 합니다. KMS 키와 S3 버킷 대상이 Lambda 함수 및 실행 역할과 다른 계정에 있는 경우 실행 역할을 신뢰하도록 KMS 키를 구성하여 kms:GenerateDataKey를 허용합니다.

이 콘솔을 사용하여 장애 시 대상을 구성하려면 다음 단계를 따르세요.

1. Lambda 콘솔의 [함수 페이지](https://console.aws.amazon.com/lambda/home#/functions)를 엽니다.

1. 함수를 선택합니다.

1. **함수 개요(Function overview)**에서 **대상 추가(Add destination)**를 선택합니다.

1. **소스**의 경우 **이벤트 소스 매핑 간접 호출**을 선택합니다.

1. **이벤트 소스 매핑**의 경우 이 함수에 대해 구성된 이벤트 소스를 선택합니다.

1. **조건**의 경우 **실패 시**를 선택합니다. 이벤트 소스 매핑 간접 호출의 경우 이 조건만 수락됩니다.

1. **대상 유형**의 경우 Lambda가 간접 호출 레코드를 전송할 대상 유형을 선택합니다.

1. **Destination(대상)**에서 리소스를 선택합니다.

1. **저장**을 선택합니다.

AWS Command Line Interface(AWS CLI)를 사용하여 장애 시 대상을 구성할 수도 있습니다. 예를 들어, 다음 [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) 명령은 SQS 장애 시 대상이 있는 이벤트 소스 매핑을 `MyFunction`에 추가합니다.

```
aws lambda create-event-source-mapping \
--function-name "MyFunction" \
--event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
```

다음 [update-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/update-event-source-mapping.html) 명령은 두 번의 재시도 시도 후 또는 레코드가 1시간 이상 지난 경우 SNS 대상으로 실패한 간접 호출 레코드를 전송하도록 이벤트 소스 매핑을 업데이트합니다.

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--maximum-retry-attempts 2 \
--maximum-record-age-in-seconds 3600 \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'
```

업데이트된 설정은 비동기식으로 적용되며 프로세스가 완료된 후에야 출력에 반영됩니다. [get-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/get-event-source-mapping.html) 명령을 사용하여 현재 상태를 봅니다.

대상을 제거하려면 `destination-config` 파라미터의 인수로 빈 문자열을 제공합니다.

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--destination-config '{"OnFailure": {"Destination": ""}}'
```

### Amazon S3에 대한 보안 모범 사례
<a name="ddb-s3-destination-security"></a>

함수 구성에서 대상을 제거하지 않고 대상으로 구성된 S3 버킷을 삭제하면 보안 위험이 초래될 수 있습니다. 사용자의 대상 버킷 이름을 다른 사용자가 알고 있는 경우 AWS 계정에서 버킷을 다시 생성할 수 있습니다. 실패한 간접 호출에 대한 레코드가 해당 버킷으로 전송되어 함수의 데이터가 공개될 수 있습니다.

**주의**  
함수의 간접 호출 레코드를 다른 AWS 계정의 S3 버킷으로 전송할 수 없도록 하려면 계정의 버킷에 대한 `s3:PutObject` 권한을 제한하는 조건을 함수의 실행 역할에 추가합니다.

다음 예제에서는 함수의 `s3:PutObject` 권한을 계정의 버킷으로 제한하는 IAM 정책을 보여줍니다. 또한 이 정책에서는 Lambda가 S3 버킷을 대상으로 사용하는 데 필요한 `s3:ListBucket` 권한도 부여합니다.

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3BucketResourceAccountWrite",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::*/*",
                "arn:aws:s3:::*"
            ],
            "Condition": {
                "StringEquals": {
                    "s3:ResourceAccount": "111122223333"
                }
            }
        }
    ]
}
```

AWS Management Console 또는 AWS CLI를 사용하여 함수의 실행 역할에 권한 정책을 추가하려면 다음 절차의 지침을 참조하세요.

------
#### [ Console ]

**함수의 실행 역할에 권한 정책을 추가하는 방법(콘솔)**

1. Lambda 콘솔의 [함수 페이지](https://console.aws.amazon.com/lambda/home#/functions)를 엽니다.

1. 실행 역할을 수정하려는 Lambda 함수를 선택하세요.

1. **구성** 탭을 선택한 다음 **사용 권한**을 선택합니다.

1. **실행 역할** 탭에서 함수의 **역할 이름**을 선택하여 역할의 IAM 콘솔 페이지를 여세요.

1. 다음을 수행하여 역할에 기본 권한 정책을 추가하세요.

   1. **권한** 창에서 **권한 추가**를 선택하고 **인라인 정책 생성**을 선택하세요.

   1. **정책 편집기**에서 **JSON**을 선택하세요.

   1. 추가하려는 정책을 편집기에 붙여넣고(이때 기존 JSON 대체) **다음**을 선택하세요.

   1. **정책 세부 정보** 아래에 **정책 이름**을 입력하세요.

   1. **정책 생성**을 선택합니다.

------
#### [ AWS CLI ]

**함수의 실행 역할에 권한 정책을 추가하는 방법(CLI)**

1. 필요한 권한이 있는 JSON 정책 문서를 생성하고 로컬 디렉터리에 저장하세요.

1. IAM `put-role-policy` CLI 명령을 사용하여 함수의 실행 역할에 권한을 추가하세요. JSON 정책 문서를 저장한 디렉터리에서 다음 명령을 실행하고 역할 이름, 정책 이름 및 정책 문서를 고유한 값으로 바꾸세요.

   ```
   aws iam put-role-policy \
   --role-name my_lambda_role \
   --policy-name LambdaS3DestinationPolicy \
   --policy-document file://my_policy.json
   ```

------

### Amazon SNS 및 Amazon SQS 간접 호출 레코드 예제
<a name="kinesis-on-failure-destination-example-sns-sqs"></a>

다음 예제에서는 DynamoDB 스트림에 대해 Lambda가 SQS 또는 SNS 대상으로 전송하는 간접 호출 레코드를 보여줍니다.

```
{
    "requestContext": {
        "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:13:49.717Z",
    "DDBStreamBatchInfo": {
        "shardId": "shardId-00000001573689847184-864758bb",
        "startSequenceNumber": "800000000003126276362",
        "endSequenceNumber": "800000000003126276362",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z",
        "batchSize": 1,
        "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388"
    }
}
```

이 정보를 사용하여 문제 해결을 위해 스트림에서 영향을 받은 레코드를 검색할 수 있습니다. 실제 레코드는 포함되지 않으므로 이 레코드를 처리하고 실제 레코드가 만료되고 없어지기 전에 스트림에서 해당 레코드를 검색해야 합니다.

### Amazon S3 간접 호출 레코드 예제
<a name="kinesis-on-failure-destination-example-sns-sqs-s3"></a>

다음 예제에서는 DynamoDB 스트림에 대해 Lambda가 S3 버킷으로 전송하는 간접 호출 레코드를 보여줍니다. SQS 및 SNS 대상에 대한 이전 예제의 모든 필드 외에도 `payload` 필드에는 원래 간접 호출 레코드가 이스케이프된 JSON 문자열로 포함되어 있습니다.

```
{
    "requestContext": {
        "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:13:49.717Z",
    "DDBStreamBatchInfo": {
        "shardId": "shardId-00000001573689847184-864758bb",
        "startSequenceNumber": "800000000003126276362",
        "endSequenceNumber": "800000000003126276362",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z",
        "batchSize": 1,
        "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388"
    },
    "payload": "<Whole Event>" // Only available in S3
}
```

간접 호출 레코드를 포함하는 S3 객체는 다음 이름 지정 규칙을 사용합니다.

```
aws/lambda/<ESM-UUID>/<shardID>/YYYY/MM/DD/YYYY-MM-DDTHH.MM.SS-<Random UUID>
```

# Lambda에서 상태 저장 DynamoDB 스트림 처리 구현
<a name="services-ddb-windows"></a>

Lambda 함수는 연속 스트림 처리 애플리케이션을 실행할 수 있습니다. 스트림은 애플리케이션을 통해 연속적으로 흐르는 무한 데이터를 나타냅니다. 이 지속적으로 업데이트되는 입력의 정보를 분석하기 위해 시간의 항으로 정의된 윈도우를 사용하여 포함된 레코드를 바인딩할 수 있습니다.

텀블링 기간은 일정한 간격으로 시작되고 끝나는 고유한 시간대입니다. 기본적으로 Lambda 호출은 상태 비저장입니다. 즉, 외부 데이터베이스 없이는 여러 연속 호출에서 데이터를 처리하는 데 사용할 수 없습니다. 그러나 텀블링 기간을 활성화하면 호출 간에 상태를 유지할 수 있습니다. 이 상태에는 현재 윈도우에 대해 이전에 처리된 메시지의 집계 결과가 포함됩니다. 상태는 샤드당 최대 1MB가 될 수 있습니다. 이 크기를 초과하면 Lambda가 윈도우를 조기 종료합니다.

스트림의 각 레코드는 특정 윈도우에 속합니다. Lambda는 각 레코드를 한 번 이상 처리하지만 각 레코드가 한 번만 처리된다고 보장하지는 않습니다. 드물게 오류 처리와 같이 일부 레코드가 두 번 이상 처리될 수 있습니다. 레코드는 항상 처음부터 순서대로 처리됩니다. 레코드가 두 번 이상 처리되는 경우 레코드가 비순차적으로 처리될 수 있습니다.

## 집계 및 처리
<a name="streams-tumbling-processing"></a>

집계 및 해당 집계의 최종 결과 처리를 위해 사용자 관리형 함수가 간접 호출됩니다. Lambda는 윈도우 내에서 수신한 모든 레코드를 집계합니다. 이러한 레코드를 각각 별도의 호출인 여러 배치에서 받을 수 있습니다. 각 호출은 상태를 수신합니다. 따라서 텀블링 기간을 사용할 때 Lambda 함수 응답에는 `state` 속성을 포함해야 합니다. 응답에 `state` 속성이 포함되지 않은 경우 Lambda는 이 호출이 실패한 것으로 간주합니다. 이 조건을 충족하기 위해 함수는 다음 JSON 모양의 `TimeWindowEventResponse` 객체를 반환할 수 있습니다.

**Example `TimeWindowEventResponse` 값**  

```
{
    "state": {
        "1": 282,
        "2": 715
    },
    "batchItemFailures": []
}
```

**참고**  
Java 함수의 경우 `Map<String, String>`을 사용하여 상태를 나타내는 것이 좋습니다.

윈도우 끝에서 `isFinalInvokeForWindow` 플래그는 이것이 최종 상태이며 처리 준비가 되었음을 나타내기 위해 `true`로 설정됩니다. 처리 후 윈도우가 완료되고 최종 호출이 완료된 다음 상태가 삭제됩니다.

윈도우 끝에서 Lambda가 집계 결과에 대한 작업을 위해 최종 처리를 사용합니다. 최종 처리는 동기식으로 간접 호출됩니다. 성공적인 호출 후 함수가 시퀀스 번호를 검사하고 스트림 처리가 계속됩니다. 호출이 실패하면 Lambda 함수는 호출이 성공할 때까지 추가 처리를 일시 중단합니다.

**Example DynamodbTimeWindowEvent**  

```
{
   "Records":[
      {
         "eventID":"1",
         "eventName":"INSERT",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"111",
            "SizeBytes":26,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"2",
         "eventName":"MODIFY",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"222",
            "SizeBytes":59,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"3",
         "eventName":"REMOVE",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"333",
            "SizeBytes":38,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      }
   ],
    "window": {
        "start": "2020-07-30T17:00:00Z",
        "end": "2020-07-30T17:05:00Z"
    },
    "state": {
        "1": "state1"
    },
    "shardId": "shard123456789",
    "eventSourceARN": "stream-ARN",
    "isFinalInvokeForWindow": false,
    "isWindowTerminatedEarly": false
}
```

## 구성
<a name="streams-tumbling-config"></a>

이벤트 소스 매핑을 생성하거나 업데이트할 때 텀블링 윈도우를 구성할 수 있습니다. 텀블링 윈도우를 구성하려면 윈도우를 초 단위로 지정합니다([TumblingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-TumblingWindowInSeconds)). 다음 예제 AWS Command Line Interface(AWS CLI) 명령은 텀블링 윈도우가 120초인 스트리밍 이벤트 소스 매핑을 생성합니다. 집계 및 처리를 위해 정의된 Lambda 함수는 `tumbling-window-example-function`입니다.

```
aws lambda create-event-source-mapping \
--event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \
--function-name tumbling-window-example-function \
--starting-position TRIM_HORIZON \
--tumbling-window-in-seconds 120
```

Lambda는 레코드가 스트림에 삽입된 시간을 기준으로 텀블링 윈도우 경계를 결정합니다. 모든 레코드에는 Lambda가 경계 결정에서 사용하는 대략적인 타임스탬프가 있습니다.

텀블링 윈도우 집계는 리샤딩을 지원하지 않습니다. 샤드가 끝나면 Lambda는 윈도우가 닫힌 것으로 간주하며 자식 샤드는 새 상태로 고유한 윈도우를 시작합니다.

텀블링 윈도우는 기존 재시도 정책 `maxRetryAttempts` 및 `maxRecordAge`를 완벽하게 지원합니다.

**Example Handler.py - 집계 및 처리**  
다음 Python 함수는 최종 상태를 집계한 다음 처리하는 방법을 보여줍니다.  

```
def lambda_handler(event, context):
    print('Incoming event: ', event)
    print('Incoming state: ', event['state'])

#Check if this is the end of the window to either aggregate or process.
    if event['isFinalInvokeForWindow']:
        # logic to handle final state of the window
        print('Destination invoke')
    else:
        print('Aggregate invoke')

#Check for early terminations
    if event['isWindowTerminatedEarly']:
        print('Window terminated early')

    #Aggregation logic
    state = event['state']
    for record in event['Records']:
        state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1

    print('Returning state: ', state)
    return {'state': state}
```

# Amazon DynamoDB 이벤트 소스 매핑을 위한 Lambda 파라미터
<a name="services-ddb-params"></a>

모든 Lambda 이벤트 소스 유형은 동일한 [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) 및 [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) API 작업을 공유합니다. 그러나 일부 파라미터만 DynamoDB Streams에 적용됩니다.


| 파라미터 | 필수 | 기본값 | 참고 | 
| --- | --- | --- | --- | 
|  BatchSize  |  N  |  100  |  최대값: 10,000  | 
|  BisectBatchOnFunctionError  |  N  |  false  | 없음  | 
|  DestinationConfig  |  N  | 해당 사항 없음  |  폐기된 레코드의 표준 Amazon SQS 대기열 또는 표준 Amazon SNS 주제 대상  | 
|  활성  |  N  |  true  | 없음  | 
|  EventSourceArn  |  Y  | 해당 사항 없음 |  데이터 스트림 또는 스트림 소비자의 ARN  | 
|  FilterCriteria  |  N  | 해당 사항 없음  |  [Lambda가 함수로 보내는 이벤트에 대한 제어](invocation-eventfiltering.md)  | 
|  FunctionName  |  Y  | 해당 사항 없음  | 없음  | 
|  FunctionResponseTypes  |  N  | 해당 사항 없음 |  함수가 배치에서 특정 실패를 보고하도록 하려면 `FunctionResponseTypes`에 `ReportBatchItemFailures` 값을 포함하세요. 자세한 내용은 [DynamoDB 및 Lambda로 부분 배치 응답 구성](services-ddb-batchfailurereporting.md) 섹션을 참조하세요.  | 
|  MaximumBatchingWindowInSeconds  |  N  |  0  | 없음  | 
|  MaximumRecordAgeInSeconds  |  N  |  -1  |  -1은 무한을 의미하며 실패한 레코드는 레코드가 만료될 때까지 재시도됩니다. [DynamoDB 스트림의 데이터 보존 한도](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.DataRetention)는 24시간입니다. 최솟값: -1 최대값: 604,800  | 
|  MaximumRetryAttempts  |  N  |  -1  |  -1(무한): 레코드가 만료될 때까지 실패한 레코드가 다시 시도됩니다 최솟값: 0 최대값: 10,000  | 
|  ParallelizationFactor  |  N  |  1  |  최댓값: 10  | 
|  StartingPosition  |  Y  | 해당 사항 없음  |  TRIM\$1HIZON 또는 LATEST  | 
|  TumblingWindowInSeconds  |  N  | 해당 사항 없음  |  최솟값: 0 최댓값: 900  | 

# DynamoDB 이벤트 소스를 통해 이벤트 필터링 사용
<a name="with-ddb-filtering"></a>

이벤트 필터링을 사용하여 Lambda가 함수로 전송하는 스트림 또는 대기열의 레코드를 제어할 수 있습니다. 이벤트 필터링의 작동 방식에 대한 일반적인 내용은 [Lambda가 함수로 보내는 이벤트에 대한 제어](invocation-eventfiltering.md)을 참조하세요.

이 섹션에서는 DynamoDB 이벤트 소스에 대한 이벤트 필터링에 중점을 둡니다.

**참고**  
DynamoDB 이벤트 소스 매핑은 `dynamodb` 키에 대한 필터링만 지원합니다.

**Topics**
+ [DynamoDB 이벤트](#filtering-ddb)
+ [테이블 속성으로 필터링](#filtering-ddb-attributes)
+ [부울 표현식으로 필터링](#filtering-ddb-boolean)
+ [Exists 연산자 사용](#filtering-ddb-exists)
+ [DynamoDB 필터링을 위한 JSON 형식](#filtering-ddb-JSON-format)

## DynamoDB 이벤트
<a name="filtering-ddb"></a>

프라이머리 키 `CustomerName`과 `AccountManager` 및 `PaymentTerms` 속성이 있는 DynamoDB 테이블이 있다고 가정해 보겠습니다. 다음은 DynamoDB 테이블 스트림의 예제 레코드를 보여줍니다.

```
{
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
          "ApproximateCreationDateTime": "1678831218.0",
          "Keys": {
              "CustomerName": {
                  "S": "AnyCompany Industries"
              }
          },
          "NewImage": {
              "AccountManager": {
                  "S": "Pat Candella"
              },
              "PaymentTerms": {
                  "S": "60 days"
              },
              "CustomerName": {
                  "S": "AnyCompany Industries"
              }
          },
          "SequenceNumber": "111",
          "SizeBytes": 26,
          "StreamViewType": "NEW_IMAGE"
      }
  }
```

DynamoDB 테이블의 키 및 속성 값을 기준으로 필터링하려면 레코드의 `dynamodb` 키를 사용하세요. 다음 섹션에는 다양한 필터 유형에 대한 예제가 나와 있습니다.

### 테이블 키로 필터링
<a name="filtering-ddb-keys"></a>

프라이머리 키 `CustomerName`이 'AnyCompany Industries'인 레코드만 함수에서 처리하도록 하려는 경우를 가정해 보겠습니다. `FilterCriteria` 객체는 다음과 같습니다.

```
{
     "Filters": [
          {
              "Pattern": "{ \"dynamodb\" : { \"Keys\" : { \"CustomerName\" : { \"S\" : [ \"AnyCompany Industries\" ] } } } }"
          }
      ]
 }
```

명확성을 더하기 위해 일반 JSON으로 확장된 필터의 `Pattern` 값은 다음과 같습니다.

```
{
     "dynamodb": {
          "Keys": {
              "CustomerName": {
                  "S": [ "AnyCompany Industries" ]
                  }
              }
          }
 }
```

콘솔, AWS CLI 또는 AWS SAM 템플릿을 사용하여 필터를 추가할 수 있습니다.

------
#### [ Console ]

콘솔을 사용하여 이 필터를 추가하려면 [이벤트 소스 매핑에 필터 기준 연결(콘솔)](invocation-eventfiltering.md#filtering-console)의 지침을 따르고 **필터 기준**에 대해 다음 문자열을 입력합니다.

```
{ "dynamodb" : { "Keys" : { "CustomerName" : { "S" : [ "AnyCompany Industries" ] } } } }
```

------
#### [ AWS CLI ]

AWS Command Line Interface(AWS CLI)를 사용하여 이러한 필터 기준으로 새 이벤트 소스 매핑을 생성하려면 다음 명령을 실행합니다.

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"Keys\" : { \"CustomerName\" : { \"S\" : [ \"AnyCompany Industries\" ] } } } }"}]}'
```

이러한 필터 기준을 기존 이벤트 소스 매핑에 추가하려면 다음 명령을 실행합니다.

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"Keys\" : { \"CustomerName\" : { \"S\" : [ \"AnyCompany Industries\" ] } } } }"}]}'
```

------
#### [ AWS SAM ]

AWS SAM을 사용하여 이 필터를 추가하려면 이벤트 소스의 YAML 템플릿에 다음 코드 조각을 추가합니다.

```
FilterCriteria:
   Filters:
     - Pattern: '{ "dynamodb" : { "Keys" : { "CustomerName" : { "S" : [ "AnyCompany Industries" ] } } } }'
```

------

## 테이블 속성으로 필터링
<a name="filtering-ddb-attributes"></a>

DynamoDB에서는 `NewImage` 및 `OldImage` 키를 사용하여 속성 값을 필터링할 수도 있습니다. 최신 테이블 이미지의 `AccountManager` 속성이 'Pat Candella' 또는 'Shirley Rodriguez'인 레코드를 필터링하려고 한다고 가정해 보겠습니다. `FilterCriteria` 객체는 다음과 같습니다.

```
{
    "Filters": [
        {
            "Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\", \"Shirley Rodriguez\" ] } } } }"
        }
    ]
}
```

명확성을 더하기 위해 일반 JSON으로 확장된 필터의 `Pattern` 값은 다음과 같습니다.

```
{
    "dynamodb": {
        "NewImage": {
            "AccountManager": {
                "S": [ "Pat Candella", "Shirley Rodriguez" ]
            }
        }
    }
}
```

콘솔, AWS CLI 또는 AWS SAM 템플릿을 사용하여 필터를 추가할 수 있습니다.

------
#### [ Console ]

콘솔을 사용하여 이 필터를 추가하려면 [이벤트 소스 매핑에 필터 기준 연결(콘솔)](invocation-eventfiltering.md#filtering-console)의 지침을 따르고 **필터 기준**에 대해 다음 문자열을 입력합니다.

```
{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella", "Shirley Rodriguez" ] } } } }
```

------
#### [ AWS CLI ]

AWS Command Line Interface(AWS CLI)를 사용하여 이러한 필터 기준으로 새 이벤트 소스 매핑을 생성하려면 다음 명령을 실행합니다.

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\", \"Shirley Rodriguez\" ] } } } }"}]}'
```

이러한 필터 기준을 기존 이벤트 소스 매핑에 추가하려면 다음 명령을 실행합니다.

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\", \"Shirley Rodriguez\" ] } } } }"}]}'
```

------
#### [ AWS SAM ]

AWS SAM을 사용하여 이 필터를 추가하려면 이벤트 소스의 YAML 템플릿에 다음 코드 조각을 추가합니다.

```
FilterCriteria:
  Filters:
    - Pattern: '{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella", "Shirley Rodriguez" ] } } } }'
```

------

## 부울 표현식으로 필터링
<a name="filtering-ddb-boolean"></a>

부울 AND 표현식을 사용하여 필터를 생성할 수도 있습니다. 이러한 표현식에는 테이블의 키 및 속성 파라미터가 모두 포함될 수 있습니다. `AccountManager`의 `NewImage` 값이 'Pat Candella'이고 `OldImage` 값이 'Terry Whitlock'인 레코드를 필터링하려고 한다고 가정해 보겠습니다. `FilterCriteria` 객체는 다음과 같습니다.

```
{
    "Filters": [
        {
            "Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\" ] } } } , \"dynamodb\" : { \"OldImage\" : { \"AccountManager\" : { \"S\" : [ \"Terry Whitlock\" ] } } } }"
        }
    ]
}
```

명확성을 더하기 위해 일반 JSON으로 확장된 필터의 `Pattern` 값은 다음과 같습니다.

```
{ 
    "dynamodb" : { 
        "NewImage" : { 
            "AccountManager" : { 
                "S" : [ 
                    "Pat Candella" 
                ] 
            } 
        } 
    }, 
    "dynamodb": { 
        "OldImage": { 
            "AccountManager": { 
                "S": [ 
                    "Terry Whitlock" 
                ] 
            } 
        } 
    } 
}
```

콘솔, AWS CLI 또는 AWS SAM 템플릿을 사용하여 필터를 추가할 수 있습니다.

------
#### [ Console ]

콘솔을 사용하여 이 필터를 추가하려면 [이벤트 소스 매핑에 필터 기준 연결(콘솔)](invocation-eventfiltering.md#filtering-console)의 지침을 따르고 **필터 기준**에 대해 다음 문자열을 입력합니다.

```
{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella" ] } } } , "dynamodb" : { "OldImage" : { "AccountManager" : { "S" : [ "Terry Whitlock" ] } } } }
```

------
#### [ AWS CLI ]

AWS Command Line Interface(AWS CLI)를 사용하여 이러한 필터 기준으로 새 이벤트 소스 매핑을 생성하려면 다음 명령을 실행합니다.

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\" ] } } } , \"dynamodb\" : { \"OldImage\" : { \"AccountManager\" : { \"S\" : [ \"Terry Whitlock\" ] } } } } "}]}'
```

이러한 필터 기준을 기존 이벤트 소스 매핑에 추가하려면 다음 명령을 실행합니다.

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\" ] } } } , \"dynamodb\" : { \"OldImage\" : { \"AccountManager\" : { \"S\" : [ \"Terry Whitlock\" ] } } } } "}]}'
```

------
#### [ AWS SAM ]

AWS SAM을 사용하여 이 필터를 추가하려면 이벤트 소스의 YAML 템플릿에 다음 코드 조각을 추가합니다.

```
FilterCriteria:
  Filters:
    - Pattern: '{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella" ] } } } , "dynamodb" : { "OldImage" : { "AccountManager" : { "S" : [ "Terry Whitlock" ] } } } }'
```

------

**참고**  
DynamoDB 이벤트 필터링은 숫자 연산자(숫자 등호 및 숫자 범위)의 사용을 지원하지 않습니다. 테이블의 항목이 숫자로 저장되더라도 이러한 파라미터는 JSON 레코드 객체의 문자열로 변환됩니다.

## Exists 연산자 사용
<a name="filtering-ddb-exists"></a>

DynamoDB의 JSON 이벤트 객체가 구조화되는 방식으로 인해 Exists 연산자를 사용하려면 특별한 주의가 필요합니다. Exists 연산자는 이벤트 JSON의 리프 노드에서만 작동하므로 필터 패턴이 Exists를 사용하여 중간 노드를 테스트하는 경우에는 작동하지 않습니다. 다음 DynamoDB 테이블 항목을 고려하세요.

```
{
  "UserID": {"S": "12345"},
  "Name": {"S": "John Doe"},
  "Organizations": {"L": [
      {"S":"Sales"},
      {"S":"Marketing"},
      {"S":"Support"}
    ]
  }
}
```

`"Organizations"`가 포함된 이벤트를 테스트하는 다음과 같은 필터 패턴을 생성할 수 있습니다.

```
{ "dynamodb" : { "NewImage" : { "Organizations" : [ { "exists": true } ] } } }
```

그러나 이 필터 패턴은 `"Organizations"`가 리프 노드가 아니므로 일치하는 항목을 반환하지 않습니다. 다음 예제에서는 Exists 연산자를 올바르게 사용하여 원하는 필터 패턴을 구성하는 방법을 보여줍니다.

```
{ "dynamodb" : { "NewImage" : {"Organizations": {"L": {"S": [ {"exists": true } ] } } } } }
```

## DynamoDB 필터링을 위한 JSON 형식
<a name="filtering-ddb-JSON-format"></a>

DynamoDB 소스에서 이벤트를 올바르게 필터링하려면 데이터 필드와 데이터 필드(`dynamodb`)의 필터 기준이 모두 유효한 JSON 형식이어야 합니다. 두 필드 중 하나가 유효한 JSON 형식이 아닌 경우 Lambda는 해당 메시지를 삭제하거나 예외를 발생시킵니다. 다음 표에는 특정 동작이 요약되어 있습니다.


| 수신 데이터 형식 | 데이터 속성에 대한 필터 패턴 형식 | 결과적 작업 | 
| --- | --- | --- | 
|  유효한 JSON  |  유효한 JSON  |  Lambda는 필터 기준에 따라 필터링합니다.  | 
|  유효한 JSON  |  데이터 속성에 대한 필터 패턴 없음  |  Lambda는 필터 기준에 따라(다른 메타데이터 속성에만 해당) 필터링합니다.  | 
|  유효한 JSON  |  JSON 외  |  이벤트 소스 매핑 생성 또는 업데이트 시 Lambda에서 예외가 발생합니다. 데이터 속성에 대한 필터 패턴은 유효한 JSON 형식이어야 합니다.  | 
|  JSON 외  |  유효한 JSON  |  Lambda는 해당 레코드를 삭제합니다.  | 
|  JSON 외  |  데이터 속성에 대한 필터 패턴 없음  |  Lambda는 필터 기준에 따라(다른 메타데이터 속성에만 해당) 필터링합니다.  | 
|  JSON 외  |  JSON 외  |  이벤트 소스 매핑 생성 또는 업데이트 시 Lambda에서 예외가 발생합니다. 데이터 속성에 대한 필터 패턴은 유효한 JSON 형식이어야 합니다.  | 

# 자습서: Amazon DynamoDB Streams와 함께 AWS Lambda 사용
<a name="with-ddb-example"></a>

 이 자습서에서는 Amazon DynamoDB 스트림의 이벤트를 사용하기 위해 Lambda 함수를 생성합니다.

## 사전 조건
<a name="with-ddb-prepare"></a>

### AWS Command Line Interface 설치
<a name="install_aws_cli"></a>

아직 AWS Command Line Interface를 설치하지 않은 경우 [AWS CLI의 최신 버전 설치 또는 업데이트](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)에서 설명하는 단계에 따라 설치하세요.

이 자습서에서는 명령을 실행할 셸 또는 명령줄 터미널이 필요합니다. Linux 및 macOS에서는 선호하는 셸과 패키지 관리자를 사용합니다.

**참고**  
Windows에서는 Lambda와 함께 일반적으로 사용하는 일부 Bash CLI 명령(예:`zip`)은 운영 체제의 기본 제공 터미널에서 지원되지 않습니다. Ubuntu와 Bash의 Windows 통합 버전을 가져오려면 [Linux용 Windows Subsystem을 설치](https://docs.microsoft.com/en-us/windows/wsl/install-win10)합니다.

## 실행 역할 생성
<a name="with-ddb-create-execution-role"></a>

함수에 AWS 리소스에 액세스할 수 있는 권한을 제공하는 [실행 역할](lambda-intro-execution-role.md)을 만듭니다.

**실행 역할을 만들려면**

1. IAM 콘솔에서 [역할 페이지](https://console.aws.amazon.com/iam/home#/roles)를 엽니다.

1. **역할 생성**을 선택합니다.

1. 다음 속성을 사용하여 역할을 만듭니다.
   + **신뢰할 수 있는 엔터티** – Lambda.
   + **권한** – **AWSLambdaDynamoDBExecutionRole**.
   + **역할 이름** – **lambda-dynamodb-role**.

**AWSLambdaDynamoDBExecutionRole**은 함수가 DynamoDB에서 항목을 읽고 CloudWatch Logs에 로그를 쓰는 데 필요한 권한을 가집니다.

## 함수 생성
<a name="with-ddb-example-create-function"></a>

DynamoDB 이벤트를 처리하는 Lambda 함수를 생성하세요. 함수 코드는 수신 이벤트 데이터 중 일부를 CloudWatch Logs에 기록합니다.

------
#### [ .NET ]

**SDK for .NET**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
.NET을 사용하여 Lambda로 DynamoDB 이벤트 사용.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace AWSLambda_DDB;

public class Function
{
    public void FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
    {
        context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");

        foreach (var record in dynamoEvent.Records)
        {
            context.Logger.LogInformation($"Event ID: {record.EventID}");
            context.Logger.LogInformation($"Event Name: {record.EventName}");

            context.Logger.LogInformation(JsonSerializer.Serialize(record));
        }

        context.Logger.LogInformation("Stream processing complete.");
    }
}
```

------
#### [ Go ]

**SDK for Go V2**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Go를 사용하여 Lambda로 DynamoDB 이벤트 사용.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-lambda-go/events"
	"fmt"
)

func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*string, error) {
	if len(event.Records) == 0 {
		return nil, fmt.Errorf("received empty event")
	}

	for _, record := range event.Records {
	 	LogDynamoDBRecord(record)
	}

	message := fmt.Sprintf("Records processed: %d", len(event.Records))
	return &message, nil
}

func main() {
	lambda.Start(HandleRequest)
}

func LogDynamoDBRecord(record events.DynamoDBEventRecord){
	fmt.Println(record.EventID)
	fmt.Println(record.EventName)
	fmt.Printf("%+v\n", record.Change)
}
```

------
#### [ Java ]

**SDK for Java 2.x**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Java를 사용하여 Lambda로 DynamoDB 이벤트 소비  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

public class example implements RequestHandler<DynamodbEvent, Void> {

    private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create();

    @Override
    public Void handleRequest(DynamodbEvent event, Context context) {
        System.out.println(GSON.toJson(event));
        event.getRecords().forEach(this::logDynamoDBRecord);
        return null;
    }

    private void logDynamoDBRecord(DynamodbStreamRecord record) {
        System.out.println(record.getEventID());
        System.out.println(record.getEventName());
        System.out.println("DynamoDB Record: " + GSON.toJson(record.getDynamodb()));
    }
}
```

------
#### [ JavaScript ]

**SDK for JavaScript (v3)**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
JavaScript를 사용하여 Lambda로 DynamoDB 이벤트 사용.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(record => {
        logDynamoDBRecord(record);
    });
};

const logDynamoDBRecord = (record) => {
    console.log(record.eventID);
    console.log(record.eventName);
    console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
```
TypeScript를 사용하여 Lambda로 DynamoDB 이벤트 사용.  

```
export const handler = async (event, context) => {
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(record => {
        logDynamoDBRecord(record);
    });
}
const logDynamoDBRecord = (record) => {
    console.log(record.eventID);
    console.log(record.eventName);
    console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
```

------
#### [ PHP ]

**SDK for PHP**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
PHP를 사용하여 Lambda로 DynamoDB 이벤트 사용.  

```
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\DynamoDb\DynamoDbHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler extends DynamoDbHandler
{
    private StderrLogger $logger;

    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handleDynamoDb(DynamoDbEvent $event, Context $context): void
    {
        $this->logger->info("Processing DynamoDb table items");
        $records = $event->getRecords();

        foreach ($records as $record) {
            $eventName = $record->getEventName();
            $keys = $record->getKeys();
            $old = $record->getOldImage();
            $new = $record->getNewImage();
            
            $this->logger->info("Event Name:".$eventName."\n");
            $this->logger->info("Keys:". json_encode($keys)."\n");
            $this->logger->info("Old Image:". json_encode($old)."\n");
            $this->logger->info("New Image:". json_encode($new));
            
            // TODO: Do interesting work based on the new data

            // Any exception thrown will be logged and the invocation will be marked as failed
        }

        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords items");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK for Python(Boto3)**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Python을 사용하여 Lambda로 DynamoDB 이벤트 사용.  

```
import json

def lambda_handler(event, context):
    print(json.dumps(event, indent=2))

    for record in event['Records']:
        log_dynamodb_record(record)

def log_dynamodb_record(record):
    print(record['eventID'])
    print(record['eventName'])
    print(f"DynamoDB Record: {json.dumps(record['dynamodb'])}")
```

------
#### [ Ruby ]

**SDK for Ruby**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Ruby를 사용하여 Lambda로 DynamoDB 이벤트 사용.  

```
def lambda_handler(event:, context:)
    return 'received empty event' if event['Records'].empty?
  
    event['Records'].each do |record|
      log_dynamodb_record(record)
    end
  
    "Records processed: #{event['Records'].length}"
  end
  
  def log_dynamodb_record(record)
    puts record['eventID']
    puts record['eventName']
    puts "DynamoDB Record: #{JSON.generate(record['dynamodb'])}"
  end
```

------
#### [ Rust ]

**SDK for Rust**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Rust를 사용하여 Lambda로 DynamoDB 이벤트 사용.  

```
use lambda_runtime::{service_fn, tracing, Error, LambdaEvent};
use aws_lambda_events::{
    event::dynamodb::{Event, EventRecord},
   };


// Built with the following dependencies:
//lambda_runtime = "0.11.1"
//serde_json = "1.0"
//tokio = { version = "1", features = ["macros"] }
//tracing = { version = "0.1", features = ["log"] }
//tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }
//aws_lambda_events = "0.15.0"

async fn function_handler(event: LambdaEvent<Event>) ->Result<(), Error> {
    
    let records = &event.payload.records;
    tracing::info!("event payload: {:?}",records);
    if records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(());
    }

    for record in records{
        log_dynamo_dbrecord(record);
    }

    tracing::info!("Dynamo db records processed");

    // Prepare the response
    Ok(())

}

fn log_dynamo_dbrecord(record: &EventRecord)-> Result<(), Error>{
    tracing::info!("EventId: {}", record.event_id);
    tracing::info!("EventName: {}", record.event_name);
    tracing::info!("DynamoDB Record: {:?}", record.change );
    Ok(())

}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
    .with_max_level(tracing::Level::INFO)
    .with_target(false)
    .without_time()
    .init();

    let func = service_fn(function_handler);
    lambda_runtime::run(func).await?;
    Ok(())
    
}
```

------

**함수를 만들려면**

1. 샘플 코드를 `example.js` 파일에 복사합니다.

1. 배포 패키지를 만듭니다.

   ```
   zip function.zip example.js
   ```

1. `create-function` 명령을 사용해 Lambda 함수를 만듭니다.

   ```
   aws lambda create-function --function-name ProcessDynamoDBRecords \
       --zip-file fileb://function.zip --handler example.handler --runtime nodejs24.x \
       --role arn:aws:iam::111122223333:role/lambda-dynamodb-role
   ```

## Lambda 함수 테스트
<a name="with-dbb-invoke-manually"></a>

이 단계에서는 `invoke` AWS Lambda CLI 명령과 다음 샘플 DynamoDB 이벤트를 사용하여 수동으로 Lambda 함수를 간접 호출합니다. 다음을 `input.txt`라는 파일에 복사합니다.

**Example input.txt**  

```
{
   "Records":[
      {
         "eventID":"1",
         "eventName":"INSERT",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"111",
            "SizeBytes":26,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"2",
         "eventName":"MODIFY",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"222",
            "SizeBytes":59,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"3",
         "eventName":"REMOVE",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"333",
            "SizeBytes":38,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      }
   ]
}
```

다음 `invoke` 명령을 실행합니다.

```
aws lambda invoke --function-name ProcessDynamoDBRecords \
    --cli-binary-format raw-in-base64-out \
    --payload file://input.txt outputfile.txt
```

**cli-binary-format** 옵션은 AWS CLI 버전 2를 사용할 때 필요합니다. 이 설정을 기본 설정으로 지정하려면 `aws configure set cli-binary-format raw-in-base64-out`을(를) 실행하세요. 자세한 내용은 [AWS CLI 지원되는 글로벌 명령줄 옵션](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-options.html#cli-configure-options-list)을 *AWS Command Line Interface 사용 설명서 버전 2에서 참조하세요*.

이 함수는 응답 본문에 문자열 `message`를 반환합니다.

`outputfile.txt` 파일의 출력을 확인합니다.

## 스트림을 활성화하여 DynamoDB 테이블 생성
<a name="with-ddb-create-buckets"></a>

스트림을 활성화하여 Amazon DynamoDB 테이블을 생성합니다.

**DynamoDB 테이블을 생성하려면**

1. [DynamoDB 콘솔](https://console.aws.amazon.com/dynamodb)을 엽니다.

1. [**Create table**]을 선택합니다.

1. 다음 설정을 사용하여 테이블을 생성합니다.
   + **테이블 이름** – **lambda-dynamodb-stream**
   + **기본 키** – **id**(문자열)

1. **Create**를 선택합니다.

**스트림을 활성화하려면**

1. [DynamoDB 콘솔](https://console.aws.amazon.com/dynamodb)을 엽니다.

1. **테이블**을 선택합니다.

1. **lambda-dynamodb-stream** 테이블을 선택합니다.

1. **내보내기 및 스트림**에서 **DynamoDB 스트림 세부 정보**를 선택합니다.

1. **켜기**를 선택합니다.

1. **보기 유형**에서는 **키 속성만**을 선택합니다.

1. **스트림 켜기**를 선택합니다.

스트림 ARN을 적어둡니다. 다음 단계에서 스트림을 Lambda 함수와 연결할 때 필요합니다. 스트림 활성화에 대한 자세한 내용은 [DynamoDB 스트림을 사용하여 Table Activity 캡처](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html)를 참조하세요.

## AWS Lambda에서 이벤트 소스 추가
<a name="with-ddb-attach-notification-configuration"></a>

AWS Lambda에서 이벤트 소스 매핑을 생성합니다. 이 이벤트 소스 매핑은 DynamoDB 스트림을 Lambda 함수와 연결합니다. 이 이벤트 소스 매핑을 생성하면 AWS Lambda가 스트림 폴링을 시작합니다.

다음 AWS CLI `create-event-source-mapping` 명령을 실행합니다. 명령이 실행된 후 UUID를 적어둡니다. 예를 들어, 이벤트 소스 매핑을 삭제할 때처럼 모든 명령에서 이벤트 소스 매핑을 참조하려면 이 UUID가 필요합니다.

```
aws lambda create-event-source-mapping --function-name ProcessDynamoDBRecords \
    --batch-size 100 --starting-position LATEST --event-source DynamoDB-stream-arn
```

 이는 지정된 DynamoDB 스트림과 Lambda 함수 사이의 매핑을 생성합니다. DynamoDB 스트림을 여러 Lambda 함수와 연결하고 동일한 Lambda 함수를 여러 스트림과 연결할 수 있습니다. 하지만 Lambda 함수는 공유하는 스트림에 대해 읽기 처리량을 공유합니다.

다음 명령을 실행하여 이벤트 소스 매핑 목록을 가져올 수 있습니다.

```
aws lambda list-event-source-mappings
```

이 목록은 사용자가 생성한 모든 이벤트 소스 매핑을 반환하며 각 매핑에 대해 `LastProcessingResult`를 표시합니다. 이 필드는 문제가 있을 경우 유용한 메시지를 제공하는 데 사용됩니다. `No records processed`(AWS Lambda가 폴링을 시작하지 않았거나 스트림에 레코드가 없음을 나타냄) 및 `OK`(AWS Lambda가 레코드를 성공적으로 읽고 Lambda 함수를 간접 호출함을 나타냄) 같은 값은 아무 문제가 없다는 점을 나타냅니다. 문제가 있는 경우 오류 메시지를 수신합니다.

이벤트 소스 매핑이 많을 경우 함수 이름 파라미터를 사용하여 결과 범위를 좁히세요.

```
aws lambda list-event-source-mappings --function-name ProcessDynamoDBRecords
```

## 설정 테스트
<a name="with-ddb-final-integration-test-no-iam"></a>

종합적 경험을 테스트합니다. 테이블 업데이트를 수행할 때 DynamoDB는 이벤트 레코드를 스트림에 기록합니다. AWS Lambda가 스트림을 폴링하면 스트림의 새 레코드를 감지하고 이벤트를 함수에 전달하여 사용자를 대신하여 Lambda 함수를 간접 호출합니다.

1. DynamoDB 콘솔에서 테이블에 항목을 추가, 업데이트하고 항목을 삭제합니다. DynamoDB는 이러한 작업에 대한 레코드를 스트림에 기록합니다.

1. AWS Lambda가 스트림을 폴링하고 스트림에 대한 업데이트를 감지하면 스트림에서 찾은 이벤트 데이터를 전달하여 Lambda 함수를 간접 호출합니다.

1. 함수는 Amazon CloudWatch에서 로그를 실행하고 생성합니다. Amazon CloudWatch 콘솔에서 보고된 로그를 확인할 수 있습니다.

## 다음 단계
<a name="with-ddb-next-steps"></a>

이 자습서에서는 Lambda를 사용하여 DynamoDB 스트림 이벤트를 처리하는 기본 과정을 보여줍니다. 프로덕션 워크로드의 경우 부분 배치 응답 로직을 구현하여 개별 레코드 오류를 보다 효율적으로 처리하는 것이 좋습니다. Powertools for AWS Lambda의 [배치 프로세서 유틸리티](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)는 Python, TypeScript, .NET 및 Java에서 사용할 수 있으며 이를 위한 강력한 솔루션을 제공하여 부분 배치 응답의 복잡성을 자동으로 처리하고 처리 완료된 레코드에 대한 재시도 횟수를 줄입니다.

## 리소스 정리
<a name="cleanup"></a>

이 자습서 용도로 생성한 리소스를 보관하고 싶지 않다면 지금 삭제할 수 있습니다. 더 이상 사용하지 않는 AWS 리소스를 삭제하면 AWS 계정에 불필요한 요금이 발생하는 것을 방지할 수 있습니다.

**Lambda 함수를 삭제하려면**

1. Lambda 콘솔의 [함수 페이지](https://console.aws.amazon.com/lambda/home#/functions)를 엽니다.

1. 생성한 함수를 선택합니다.

1. **작업**(Actions), **삭제**(Delete)를 선택합니다.

1. 텍스트 입력 필드에 **confirm**를 입력하고 **Delete**(삭제)를 선택합니다.

**집행 역할 삭제**

1. IAM 콘솔에서 [역할 페이지](https://console.aws.amazon.com/iam/home#/roles)를 엽니다.

1. 생성한 실행 역할을 선택합니다.

1. **삭제**를 선택합니다.

1. 텍스트 입력 필드에 역할의 이름을 입력하고 **Delete**(삭제)를 선택합니다.

**DynamoDB 테이블을 삭제하려면**

1. DynamoDB 콘솔의 [테이블 페이지](https://console.aws.amazon.com//dynamodb/home#tables:)를 엽니다.

1. 생성한 테이블을 선택합니다.

1. **삭제**를 선택합니다.

1. 텍스트 상자에 **delete**를 입력합니다.

1. **테이블 삭제**를 선택합니다.