자습서: Amazon DynamoDB Streams와 함께 AWS Lambda 사용 - AWS Lambda

자습서: Amazon DynamoDB Streams와 함께 AWS Lambda 사용

이 자습서에서는 Amazon DynamoDB 스트림의 이벤트를 사용하기 위해 Lambda 함수를 생성합니다.

사전 조건

이 자습서에서는 사용자가 기본 Lambda 작업과 Lambda 콘솔에 대해 어느 정도 알고 있다고 가정합니다. 그렇지 않은 경우 콘솔로 Lambda 함수 생성의 지침에 따라 첫 Lambda 함수를 생성합니다.

다음 단계를 완료하려면 AWS CLI 버전 2가 필요합니다. 명령과 예상 결과는 별도의 블록에 나열됩니다.

aws --version

다음 결과가 표시됩니다.

aws-cli/2.13.27 Python/3.11.6 Linux/4.14.328-248.540.amzn2.x86_64 exe/x86_64.amzn.2

긴 명령의 경우 이스케이프 문자(\)를 사용하여 명령을 여러 행으로 분할합니다.

Linux 및 macOS는 선호 셸과 패키지 관리자를 사용합니다.

참고

Windows에서는 Lambda와 함께 일반적으로 사용하는 일부 Bash CLI 명령(예:zip)은 운영 체제의 기본 제공 터미널에서 지원되지 않습니다. Ubuntu와 Bash의 Windows 통합 버전을 가져오려면 Linux용 Windows Subsystem을 설치합니다. 이 안내서의 예제 CLI 명령은 Linux 형식을 사용합니다. Windows CLI를 사용하는 경우 인라인 JSON 문서를 포함하는 명령의 형식을 다시 지정해야 합니다.

실행 역할 생성

함수에 AWS 리소스에 액세스할 수 있는 권한을 제공하는 실행 역할을 만듭니다.

실행 역할을 만들려면
  1. IAM 콘솔에서 역할 페이지를 엽니다.

  2. 역할 생성을 선택합니다.

  3. 다음 속성을 사용하여 역할을 만듭니다.

    • 신뢰할 수 있는 엔터티 – Lambda.

    • 권한AWSLambdaDynamoDBExecutionRole.

    • 역할 이름lambda-dynamodb-role.

AWSLambdaDynamoDBExecutionRole은 함수가 DynamoDB에서 항목을 읽고 CloudWatch Logs에 로그를 쓰는 데 필요한 권한을 가집니다.

함수 생성

DynamoDB 이벤트를 처리하는 Lambda 함수를 생성하세요. 함수 코드는 수신 이벤트 데이터 중 일부를 CloudWatch Logs에 기록합니다.

.NET
AWS SDK for .NET
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

.NET을 사용하여 Lambda로 DynamoDB 이벤트 사용.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text.Json; using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.DynamoDBEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace AWSLambda_DDB; public class Function { public void FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context) { context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records..."); foreach (var record in dynamoEvent.Records) { context.Logger.LogInformation($"Event ID: {record.EventID}"); context.Logger.LogInformation($"Event Name: {record.EventName}"); context.Logger.LogInformation(JsonSerializer.Serialize(record)); } context.Logger.LogInformation("Stream processing complete."); } }
Go
SDK for Go V2
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

Go를 사용하여 Lambda로 DynamoDB 이벤트 사용.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-lambda-go/events" "fmt" ) func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*string, error) { if len(event.Records) == 0 { return nil, fmt.Errorf("received empty event") } for _, record := range event.Records { LogDynamoDBRecord(record) } message := fmt.Sprintf("Records processed: %d", len(event.Records)) return &message, nil } func main() { lambda.Start(HandleRequest) } func LogDynamoDBRecord(record events.DynamoDBEventRecord){ fmt.Println(record.EventID) fmt.Println(record.EventName) fmt.Printf("%+v\n", record.Change) }
Java
SDK for Java 2.x
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

Java를 사용하여 Lambda로 DynamoDB 이벤트 소비

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord; import com.google.gson.Gson; import com.google.gson.GsonBuilder; public class example implements RequestHandler<DynamodbEvent, Void> { private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create(); @Override public Void handleRequest(DynamodbEvent event, Context context) { System.out.println(GSON.toJson(event)); event.getRecords().forEach(this::logDynamoDBRecord); return null; } private void logDynamoDBRecord(DynamodbStreamRecord record) { System.out.println(record.getEventID()); System.out.println(record.getEventName()); System.out.println("DynamoDB Record: " + GSON.toJson(record.getDynamodb())); } }
JavaScript
SDK for JavaScript (v3)
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

JavaScript를 사용하여 Lambda로 DynamoDB 이벤트 사용.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 exports.handler = async (event, context) => { console.log(JSON.stringify(event, null, 2)); event.Records.forEach(record => { logDynamoDBRecord(record); }); }; const logDynamoDBRecord = (record) => { console.log(record.eventID); console.log(record.eventName); console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`); };

TypeScript를 사용하여 Lambda로 DynamoDB 이벤트 사용.

export const handler = async (event, context) => { console.log(JSON.stringify(event, null, 2)); event.Records.forEach(record => { logDynamoDBRecord(record); }); } const logDynamoDBRecord = (record) => { console.log(record.eventID); console.log(record.eventName); console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`); };
PHP
SDK for PHP
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

PHP를 사용하여 Lambda로 DynamoDB 이벤트 사용.

<?php # using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\DynamoDb\DynamoDbEvent; use Bref\Event\DynamoDb\DynamoDbHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler extends DynamoDbHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handleDynamoDb(DynamoDbEvent $event, Context $context): void { $this->logger->info("Processing DynamoDb table items"); $records = $event->getRecords(); foreach ($records as $record) { $eventName = $record->getEventName(); $keys = $record->getKeys(); $old = $record->getOldImage(); $new = $record->getNewImage(); $this->logger->info("Event Name:".$eventName."\n"); $this->logger->info("Keys:". json_encode($keys)."\n"); $this->logger->info("Old Image:". json_encode($old)."\n"); $this->logger->info("New Image:". json_encode($new)); // TODO: Do interesting work based on the new data // Any exception thrown will be logged and the invocation will be marked as failed } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords items"); } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

Python을 사용하여 Lambda로 DynamoDB 이벤트 사용.

import json def lambda_handler(event, context): print(json.dumps(event, indent=2)) for record in event['Records']: log_dynamodb_record(record) def log_dynamodb_record(record): print(record['eventID']) print(record['eventName']) print(f"DynamoDB Record: {json.dumps(record['dynamodb'])}")
Ruby
SDK for Ruby
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

Ruby를 사용하여 Lambda로 DynamoDB 이벤트 사용.

def lambda_handler(event:, context:) return 'received empty event' if event['Records'].empty? event['Records'].each do |record| log_dynamodb_record(record) end "Records processed: #{event['Records'].length}" end def log_dynamodb_record(record) puts record['eventID'] puts record['eventName'] puts "DynamoDB Record: #{JSON.generate(record['dynamodb'])}" end
Rust
SDK for Rust
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

Rust를 사용하여 Lambda로 DynamoDB 이벤트 사용.

use lambda_runtime::{service_fn, tracing, Error, LambdaEvent}; use aws_lambda_events::{ event::dynamodb::{Event, EventRecord}, }; // Built with the following dependencies: //lambda_runtime = "0.11.1" //serde_json = "1.0" //tokio = { version = "1", features = ["macros"] } //tracing = { version = "0.1", features = ["log"] } //tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] } //aws_lambda_events = "0.15.0" async fn function_handler(event: LambdaEvent<Event>) ->Result<(), Error> { let records = &event.payload.records; tracing::info!("event payload: {:?}",records); if records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(()); } for record in records{ log_dynamo_dbrecord(record); } tracing::info!("Dynamo db records processed"); // Prepare the response Ok(()) } fn log_dynamo_dbrecord(record: &EventRecord)-> Result<(), Error>{ tracing::info!("EventId: {}", record.event_id); tracing::info!("EventName: {}", record.event_name); tracing::info!("DynamoDB Record: {:?}", record.change ); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) .with_target(false) .without_time() .init(); let func = service_fn(function_handler); lambda_runtime::run(func).await?; Ok(()) }
함수를 만들려면
  1. 샘플 코드를 example.js 파일에 복사합니다.

  2. 배포 패키지를 만듭니다.

    zip function.zip example.js
  3. create-function 명령을 사용해 Lambda 함수를 만듭니다.

    aws lambda create-function --function-name ProcessDynamoDBRecords \ --zip-file fileb://function.zip --handler example.handler --runtime nodejs18.x \ --role arn:aws:iam::111122223333:role/lambda-dynamodb-role

Lambda 함수 테스트

이 단계에서는 invoke AWS Lambda CLI 명령과 다음 샘플 DynamoDB 이벤트를 사용하여 수동으로 Lambda 함수를 호출합니다. 다음을 input.txt라는 파일에 복사합니다.

예 input.txt
{ "Records":[ { "eventID":"1", "eventName":"INSERT", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"111", "SizeBytes":26, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"2", "eventName":"MODIFY", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"222", "SizeBytes":59, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"3", "eventName":"REMOVE", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "SequenceNumber":"333", "SizeBytes":38, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" } ] }

다음 invoke 명령을 실행합니다.

aws lambda invoke --function-name ProcessDynamoDBRecords \ --cli-binary-format raw-in-base64-out \ --payload file://input.txt outputfile.txt

cli-binary-format 옵션은 AWS CLI 버전 2를 사용할 때 필요합니다. 이 설정을 기본 설정으로 지정하려면 aws configure set cli-binary-format raw-in-base64-out을(를) 실행하세요. 자세한 내용은 AWS CLI 지원되는 글로벌 명령줄 옵션을 AWS Command Line Interface 사용 설명서 버전 2에서 참조하세요.

이 함수는 응답 본문에 문자열 message를 반환합니다.

outputfile.txt 파일의 출력을 확인합니다.

스트림을 활성화하여 DynamoDB 테이블 생성

스트림을 활성화하여 Amazon DynamoDB 테이블을 생성합니다.

DynamoDB 테이블을 생성하려면
  1. DynamoDB 콘솔을 엽니다.

  2. [Create table]을 선택합니다.

  3. 다음 설정을 사용하여 테이블을 생성합니다.

    • 테이블 이름lambda-dynamodb-stream

    • 기본 키id(문자열)

  4. 생성(Create)을 선택합니다.

스트림을 활성화하려면
  1. DynamoDB 콘솔을 엽니다.

  2. 테이블을 선택합니다.

  3. lambda-dynamodb-stream 테이블을 선택합니다.

  4. 내보내기 및 스트림에서 DynamoDB 스트림 세부 정보를 선택합니다.

  5. 켜기를 선택합니다.

  6. 보기 유형에서는 키 속성만을 선택합니다.

  7. 스트림 켜기를 선택합니다.

스트림 ARN을 적어둡니다. 다음 단계에서 스트림을 Lambda 함수와 연결할 때 필요합니다. 스트림 활성화에 대한 자세한 내용은 DynamoDB 스트림을 사용하여 Table Activity 캡처를 참조하세요.

AWS Lambda에서 이벤트 소스 추가

AWS Lambda에서 이벤트 소스 매핑을 생성합니다. 이 이벤트 소스 매핑은 DynamoDB 스트림을 Lambda 함수와 연결합니다. 이 이벤트 소스 매핑을 생성하면 AWS Lambda가 스트림 폴링을 시작합니다.

다음 AWS CLI create-event-source-mapping 명령을 실행합니다. 명령이 실행된 후 UUID를 적어둡니다. 예를 들어, 이벤트 소스 매핑을 삭제할 때처럼 모든 명령에서 이벤트 소스 매핑을 참조하려면 이 UUID가 필요합니다.

aws lambda create-event-source-mapping --function-name ProcessDynamoDBRecords \ --batch-size 100 --starting-position LATEST --event-source DynamoDB-stream-arn

이는 지정된 DynamoDB 스트림과 Lambda 함수 사이의 매핑을 생성합니다. DynamoDB 스트림을 여러 Lambda 함수와 연결하고 동일한 Lambda 함수를 여러 스트림과 연결할 수 있습니다. 하지만 Lambda 함수는 공유하는 스트림에 대해 읽기 처리량을 공유합니다.

다음 명령을 실행하여 이벤트 소스 매핑 목록을 가져올 수 있습니다.

aws lambda list-event-source-mappings

이 목록은 사용자가 생성한 모든 이벤트 소스 매핑을 반환하며 각 매핑에 대해 LastProcessingResult를 표시합니다. 이 필드는 문제가 있을 경우 유용한 메시지를 제공하는 데 사용됩니다. No records processed(AWS Lambda가 폴링을 시작하지 않았거나 스트림에 레코드가 없음을 나타냄) 및 OK(AWS Lambda가 레코드를 성공적으로 읽고 Lambda 함수를 호출함을 나타냄) 같은 값은 아무 문제가 없다는 점을 나타냅니다. 문제가 있는 경우 오류 메시지를 수신합니다.

이벤트 소스 매핑이 많을 경우 함수 이름 파라미터를 사용하여 결과 범위를 좁히세요.

aws lambda list-event-source-mappings --function-name ProcessDynamoDBRecords

설정 테스트

종합적 경험을 테스트합니다. 테이블 업데이트를 수행할 때 DynamoDB는 이벤트 레코드를 스트림에 기록합니다. AWS Lambda가 스트림을 폴링하면 스트림의 새 레코드를 감지하고 이벤트를 함수에 전달하여 사용자를 대신하여 Lambda 함수를 호출합니다.

  1. DynamoDB 콘솔에서 테이블에 항목을 추가, 업데이트하고 항목을 삭제합니다. DynamoDB는 이러한 작업에 대한 레코드를 스트림에 기록합니다.

  2. AWS Lambda가 스트림을 폴링하고 스트림에 대한 업데이트를 감지하면 스트림에서 찾은 이벤트 데이터를 전달하여 Lambda 함수를 호출합니다.

  3. 함수는 Amazon CloudWatch에서 로그를 실행하고 생성합니다. Amazon CloudWatch 콘솔에서 보고된 로그를 확인할 수 있습니다.

리소스 정리

이 자습서 용도로 생성한 리소스를 보관하고 싶지 않다면 지금 삭제할 수 있습니다. 더 이상 사용하지 않는 AWS 리소스를 삭제하면 AWS 계정에 불필요한 요금이 발생하는 것을 방지할 수 있습니다.

Lambda 함수를 삭제하려면
  1. Lambda 콘솔의 함수 페이지를 엽니다.

  2. 생성한 함수를 선택합니다.

  3. 작업, 삭제를 선택합니다.

  4. 텍스트 입력 필드에 delete를 입력하고 Delete(삭제)를 선택합니다.

집행 역할 삭제
  1. IAM 콘솔에서 역할 페이지를 엽니다.

  2. 생성한 실행 역할을 선택합니다.

  3. Delete(삭제)를 선택합니다.

  4. 텍스트 입력 필드에 역할의 이름을 입력하고 Delete(삭제)를 선택합니다.

DynamoDB 테이블을 삭제하려면
  1. DynamoDB 콘솔의 테이블 페이지를 엽니다.

  2. 생성한 테이블을 선택합니다.

  3. Delete(삭제)를 선택합니다.

  4. 텍스트 상자에 delete를 입력합니다.

  5. 테이블 삭제를 선택합니다.