자습서: Kinesis Data Streams에서 Lambda 사용 - AWS Lambda

자습서: Kinesis Data Streams에서 Lambda 사용

이 자습서에서는 Amazon Kinesis 데이터 스트림의 이벤트를 소비하기 위한 Lambda 함수를 생성합니다.

  1. 사용자 지정 앱은 스트림에 레코드를 기록합니다.

  2. AWS Lambda는 스트림에서 새로운 레코드가 감지될 때마다 스트림을 폴링하고 Lambda 함수를 간접 호출합니다.

  3. AWS Lambda는 Lambda 함수를 생성할 때 지정한 실행 역할을 수임하여 Lambda 함수를 실행합니다.

사전 조건

이 자습서에서는 사용자가 기본 Lambda 작업과 Lambda 콘솔에 대해 어느 정도 알고 있다고 가정합니다. 그렇지 않은 경우 콘솔로 Lambda 함수 생성의 지침에 따라 첫 Lambda 함수를 생성합니다.

다음 단계를 완료하려면 AWS CLI 버전 2가 필요합니다. 명령과 예상 결과는 별도의 블록에 나열됩니다.

aws --version

다음 결과가 표시됩니다.

aws-cli/2.13.27 Python/3.11.6 Linux/4.14.328-248.540.amzn2.x86_64 exe/x86_64.amzn.2

긴 명령의 경우 이스케이프 문자(\)를 사용하여 명령을 여러 행으로 분할합니다.

Linux 및 macOS는 선호 셸과 패키지 관리자를 사용합니다.

참고

Windows에서는 Lambda와 함께 일반적으로 사용하는 일부 Bash CLI 명령(예:zip)은 운영 체제의 기본 제공 터미널에서 지원되지 않습니다. Ubuntu와 Bash의 Windows 통합 버전을 가져오려면 Linux용 Windows Subsystem을 설치합니다. 이 안내서의 예제 CLI 명령은 Linux 형식을 사용합니다. Windows CLI를 사용하는 경우 인라인 JSON 문서를 포함하는 명령의 형식을 다시 지정해야 합니다.

실행 역할 생성

함수에 AWS 리소스에 액세스할 수 있는 권한을 제공하는 실행 역할을 만듭니다.

실행 역할을 만들려면
  1. IAM 콘솔에서 역할 페이지를 엽니다.

  2. 역할 생성을 선택합니다.

  3. 다음 속성을 사용하여 역할을 만듭니다.

    • 신뢰할 수 있는 엔터티AWS Lambda.

    • 권한AWSLambdaKinesisExecutionRole.

    • 역할 이름lambda-kinesis-role.

AWSLambdaKinesisExecutionRole 정책은 함수가 Kinesis에서 항목을 읽고 CloudWatch Logs에 로그를 쓰는 데 필요한 권한을 가집니다.

함수 생성

Kinesis 메시지를 처리하는 Lambda 함수를 생성합니다. 함수 코드는 Kinesis 레코드의 이벤트 ID 및 이벤트 데이터를 CloudWatch 로그에 기록합니다.

이 자습서에서는 Node.js 18.x 런타임을 사용하지만 다른 런타임 언어의 예제 코드도 제공했습니다. 다음 상자에서 탭을 선택하여 관심 있는 런타임에 대한 코드를 볼 수 있습니다. 이 단계에서 사용할 JavaScript 코드는 JavaScript 탭에 표시된 첫 번째 예제입니다.

.NET
AWS SDK for .NET
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

.NET을 사용하여 Lambda로 Kinesis 이벤트 사용

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisEvents; using AWS.Lambda.Powertools.Logging; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace KinesisIntegrationSampleCode; public class Function { // Powertools Logger requires an environment variables against your function // POWERTOOLS_SERVICE_NAME [Logging(LogEvent = true)] public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context) { if (evnt.Records.Count == 0) { Logger.LogInformation("Empty Kinesis Event received"); return; } foreach (var record in evnt.Records) { try { Logger.LogInformation($"Processed Event with EventId: {record.EventId}"); string data = await GetRecordDataAsync(record.Kinesis, context); Logger.LogInformation($"Data: {data}"); // TODO: Do interesting work based on the new data } catch (Exception ex) { Logger.LogError($"An error occurred {ex.Message}"); throw; } } Logger.LogInformation($"Successfully processed {evnt.Records.Count} records."); } private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context) { byte[] bytes = record.Data.ToArray(); string data = Encoding.UTF8.GetString(bytes); await Task.CompletedTask; //Placeholder for actual async work return data; } }
Go
SDK for Go V2
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

Go를 사용하여 Lambda로 Kinesis 이벤트를 사용합니다.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "log" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error { if len(kinesisEvent.Records) == 0 { log.Printf("empty Kinesis event received") return nil } for _, record := range kinesisEvent.Records { log.Printf("processed Kinesis event with EventId: %v", record.EventID) recordDataBytes := record.Kinesis.Data recordDataText := string(recordDataBytes) log.Printf("record data: %v", recordDataText) // TODO: Do interesting work based on the new data } log.Printf("successfully processed %v records", len(kinesisEvent.Records)) return nil } func main() { lambda.Start(handler) }
Java
SDK for Java 2.x
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

Java를 사용하여 Lambda에서 Kinesis 이벤트를 사용합니다.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package example; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.LambdaLogger; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; public class Handler implements RequestHandler<KinesisEvent, Void> { @Override public Void handleRequest(final KinesisEvent event, final Context context) { LambdaLogger logger = context.getLogger(); if (event.getRecords().isEmpty()) { logger.log("Empty Kinesis Event received"); return null; } for (KinesisEvent.KinesisEventRecord record : event.getRecords()) { try { logger.log("Processed Event with EventId: "+record.getEventID()); String data = new String(record.getKinesis().getData().array()); logger.log("Data:"+ data); // TODO: Do interesting work based on the new data } catch (Exception ex) { logger.log("An error occurred:"+ex.getMessage()); throw ex; } } logger.log("Successfully processed:"+event.getRecords().size()+" records"); return null; } }
JavaScript
SDK for JavaScript (v3)
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

JavaScript를 사용하여 Lambda로 Kinesis 이벤트 사용

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 exports.handler = async (event, context) => { for (const record of event.Records) { try { console.log(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); console.log(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { console.error(`An error occurred ${err}`); throw err; } } console.log(`Successfully processed ${event.Records.length} records.`); }; async function getRecordDataAsync(payload) { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }

TypeScript를 사용하여 Lambda로 Kinesis 이벤트 사용

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { KinesisStreamEvent, Context, KinesisStreamHandler, KinesisStreamRecordPayload, } from "aws-lambda"; import { Buffer } from "buffer"; import { Logger } from "@aws-lambda-powertools/logger"; const logger = new Logger({ logLevel: "INFO", serviceName: "kinesis-stream-handler-sample", }); export const functionHandler: KinesisStreamHandler = async ( event: KinesisStreamEvent, context: Context ): Promise<void> => { for (const record of event.Records) { try { logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); logger.info(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { logger.error(`An error occurred ${err}`); throw err; } logger.info(`Successfully processed ${event.Records.length} records.`); } }; async function getRecordDataAsync( payload: KinesisStreamRecordPayload ): Promise<string> { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }
PHP
SDK for PHP
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

PHP를 사용하여 Lambda로 Kinesis 이벤트를 사용합니다.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 <?php # using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\Kinesis\KinesisEvent; use Bref\Event\Kinesis\KinesisHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler extends KinesisHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handleKinesis(KinesisEvent $event, Context $context): void { $this->logger->info("Processing records"); $records = $event->getRecords(); foreach ($records as $record) { $data = $record->getData(); $this->logger->info(json_encode($data)); // TODO: Do interesting work based on the new data // Any exception thrown will be logged and the invocation will be marked as failed } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

Python을 사용하여 Lambda로 Kinesis 이벤트를 사용합니다.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import base64 def lambda_handler(event, context): for record in event['Records']: try: print(f"Processed Kinesis Event - EventID: {record['eventID']}") record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8') print(f"Record Data: {record_data}") # TODO: Do interesting work based on the new data except Exception as e: print(f"An error occurred {e}") raise e print(f"Successfully processed {len(event['Records'])} records.")
Ruby
SDK for Ruby
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

Ruby를 사용하여 Lambda로 Kinesis 이벤트를 사용합니다.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 require 'aws-sdk' def lambda_handler(event:, context:) event['Records'].each do |record| begin puts "Processed Kinesis Event - EventID: #{record['eventID']}" record_data = get_record_data_async(record['kinesis']) puts "Record Data: #{record_data}" # TODO: Do interesting work based on the new data rescue => err $stderr.puts "An error occurred #{err}" raise err end end puts "Successfully processed #{event['Records'].length} records." end def get_record_data_async(payload) data = Base64.decode64(payload['data']).force_encoding('UTF-8') # Placeholder for actual async work # You can use Ruby's asynchronous programming tools like async/await or fibers here. return data end
Rust
SDK for Rust
참고

GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

Rust를 사용하여 Lambda로 Kinesis 이벤트를 사용합니다.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::event::kinesis::KinesisEvent; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> { if event.payload.records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(()); } event.payload.records.iter().for_each(|record| { tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default()); let record_data = std::str::from_utf8(&record.kinesis.data); match record_data { Ok(data) => { // log the record data tracing::info!("Data: {}", data); } Err(e) => { tracing::error!("Error: {}", e); } } }); tracing::info!( "Successfully processed {} records", event.payload.records.len() ); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }
함수를 만들려면
  1. 프로젝트에 대한 디렉터리를 생성하고 해당 디렉터리로 전환합니다.

    mkdir kinesis-tutorial cd kinesis-tutorial
  2. 샘플 JavaScript 코드를 새 index.js 파일에 복사합니다.

  3. 배포 패키지를 만듭니다.

    zip function.zip index.js
  4. create-function 명령을 사용해 Lambda 함수를 만듭니다.

    aws lambda create-function --function-name ProcessKinesisRecords \ --zip-file fileb://function.zip --handler index.handler --runtime nodejs18.x \ --role arn:aws:iam::111122223333:role/lambda-kinesis-role

Lambda 함수 테스트

invoke AWS Lambda CLI 명령 및 샘플 Kinesis 이벤트를 사용하여 Lambda 함수를 수동으로 간접 호출합니다.

Lambda 함수를 테스트하려면
  1. 다음 JSON을 파일에 복사하고 input.txt로 저장합니다.

    { "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream" } ] }
  2. invoke 명령을 사용하여 함수로 이벤트를 전송합니다.

    aws lambda invoke --function-name ProcessKinesisRecords \ --cli-binary-format raw-in-base64-out \ --payload file://input.txt outputfile.txt

    cli-binary-format 옵션은 AWS CLI 버전 2를 사용할 때 필요합니다. 이 설정을 기본 설정으로 지정하려면 aws configure set cli-binary-format raw-in-base64-out을(를) 실행하세요. 자세한 내용은 AWS CLI 지원되는 글로벌 명령줄 옵션을 AWS Command Line Interface 사용 설명서 버전 2에서 참조하세요.

    응답이 out.txt로 저장됩니다.

Kinesis 스트림 생성

create-stream 명령을 사용하여 스트림을 만듭니다.

aws kinesis create-stream --stream-name lambda-stream --shard-count 1

다음 describe-stream 명령을 실행하여 스트림 ARN을 가져옵니다.

aws kinesis describe-stream --stream-name lambda-stream

다음 결과가 표시됩니다:

{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "340282366920746074317682119384634633455"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610"
                }
            }
        ],
        "StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream",
        "StreamName": "lambda-stream",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 24,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "KeyId": null,
        "StreamCreationTimestamp": 1544828156.0
    }
}

다음 단계에서 스트림 ARN을 사용하여 스트림을 Lambda 함수와 연결합니다.

AWS Lambda에서 이벤트 소스 추가

다음 AWS CLI add-event-source 명령을 실행합니다.

aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \ --batch-size 100 --starting-position LATEST

나중에 사용할 수 있도록 매핑 ID를 기록해 둡니다. list-event-source-mappings 명령을 실행하여 이벤트 소스 매핑 목록을 가져올 수 있습니다.

aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream

응답에서 상태 값이 enabled인지 확인할 수 있습니다. 이벤트 소스 매핑을 비활성화하여 폴링을 일시적으로 중지시켜 레코드 손실을 방지할 수 있습니다.

설정 테스트

이벤트 소스 매핑을 테스트하려면 Kinesis 스트림에 이벤트 레코드를 추가합니다. --data 값은 Kinesis로 전송하기 전에 CLI가 base64로 인코딩하는 문자열입니다. 동일한 명령을 두 번 이상 실행하여 여러 레코드를 스트림에 추가할 수 있습니다.

aws kinesis put-record --stream-name lambda-stream --partition-key 1 \ --data "Hello, this is a test."

Lambda에서는 실행 역할을 사용하여 스트림에서 레코드를 읽습니다. 그런 다음 Lambda 함수를 간접 호출해 레코드 배치를 전달합니다. 함수는 각 레코드에서 데이터를 디코딩하고 로깅해 CloudWatch Logs에 출력을 전송합니다. 로그는 CloudWatch 콘솔에서 확인합니다.

리소스 정리

이 자습서 용도로 생성한 리소스를 보관하고 싶지 않다면 지금 삭제할 수 있습니다. 더 이상 사용하지 않는 AWS 리소스를 삭제하면 AWS 계정에 불필요한 요금이 발생하는 것을 방지할 수 있습니다.

집행 역할 삭제
  1. IAM 콘솔에서 역할 페이지를 엽니다.

  2. 생성한 실행 역할을 선택합니다.

  3. Delete(삭제)를 선택합니다.

  4. 텍스트 입력 필드에 역할의 이름을 입력하고 Delete(삭제)를 선택합니다.

Lambda 함수를 삭제하려면
  1. Lambda 콘솔의 함수 페이지를 엽니다.

  2. 생성한 함수를 선택합니다.

  3. 작업, 삭제를 선택합니다.

  4. 텍스트 입력 필드에 delete를 입력하고 Delete(삭제)를 선택합니다.

Kinesis 스트림을 삭제하려면
  1. AWS Management Console에 로그인하여 https://console.aws.amazon.com/kinesis에서 Kinesis 콘솔을 엽니다.

  2. 생성한 스트림을 선택합니다.

  3. 작업, 삭제를 선택합니다.

  4. 텍스트 입력 필드에 delete을 입력합니다.

  5. Delete(삭제)를 선택합니다.