자습서: Kinesis Data Streams에서 Lambda 사용
이 자습서에서는 Amazon Kinesis 데이터 스트림의 이벤트를 소비하기 위한 Lambda 함수를 생성합니다.
-
사용자 지정 앱은 스트림에 레코드를 기록합니다.
-
AWS Lambda는 스트림에서 새로운 레코드가 감지될 때마다 스트림을 폴링하고 Lambda 함수를 간접 호출합니다.
-
AWS Lambda는 Lambda 함수를 생성할 때 지정한 실행 역할을 수임하여 Lambda 함수를 실행합니다.
사전 조건
이 자습서에서는 사용자가 기본 Lambda 작업과 Lambda 콘솔에 대해 어느 정도 알고 있다고 가정합니다. 그렇지 않은 경우 콘솔로 Lambda 함수 생성의 지침에 따라 첫 Lambda 함수를 생성합니다.
다음 단계를 완료하려면 AWS CLI 버전 2가 필요합니다. 명령과 예상 결과는 별도의 블록에 나열됩니다.
aws --version
다음 결과가 표시됩니다.
aws-cli/2.13.27 Python/3.11.6 Linux/4.14.328-248.540.amzn2.x86_64 exe/x86_64.amzn.2
긴 명령의 경우 이스케이프 문자(\
)를 사용하여 명령을 여러 행으로 분할합니다.
Linux 및 macOS는 선호 셸과 패키지 관리자를 사용합니다.
Windows에서는 Lambda와 함께 일반적으로 사용하는 일부 Bash CLI 명령(예:zip
)은 운영 체제의 기본 제공 터미널에서 지원되지 않습니다. Ubuntu와 Bash의 Windows 통합 버전을 가져오려면 Linux용 Windows Subsystem을 설치합니다. 이 안내서의 예제 CLI 명령은 Linux 형식을 사용합니다. Windows CLI를 사용하는 경우 인라인 JSON 문서를 포함하는 명령의 형식을 다시 지정해야 합니다.
실행 역할 생성
함수에 AWS 리소스에 액세스할 수 있는 권한을 제공하는 실행 역할을 만듭니다.
실행 역할을 만들려면
-
IAM 콘솔에서 역할 페이지를 엽니다.
-
역할 생성을 선택합니다.
-
다음 속성을 사용하여 역할을 만듭니다.
-
신뢰할 수 있는 엔터티 – AWS Lambda.
-
권한 – AWSLambdaKinesisExecutionRole.
-
역할 이름 – lambda-kinesis-role
.
AWSLambdaKinesisExecutionRole 정책은 함수가 Kinesis에서 항목을 읽고 CloudWatch Logs에 로그를 쓰는 데 필요한 권한을 가집니다.
함수 생성
Kinesis 메시지를 처리하는 Lambda 함수를 생성합니다. 함수 코드는 Kinesis 레코드의 이벤트 ID 및 이벤트 데이터를 CloudWatch 로그에 기록합니다.
이 자습서에서는 Node.js 18.x 런타임을 사용하지만 다른 런타임 언어의 예제 코드도 제공했습니다. 다음 상자에서 탭을 선택하여 관심 있는 런타임에 대한 코드를 볼 수 있습니다. 이 단계에서 사용할 JavaScript 코드는 JavaScript 탭에 표시된 첫 번째 예제입니다.
- .NET
-
- AWS SDK for .NET
-
GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
.NET을 사용하여 Lambda로 Kinesis 이벤트 사용
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace KinesisIntegrationSampleCode;
public class Function
{
// Powertools Logger requires an environment variables against your function
// POWERTOOLS_SERVICE_NAME
[Logging(LogEvent = true)]
public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context)
{
if (evnt.Records.Count == 0)
{
Logger.LogInformation("Empty Kinesis Event received");
return;
}
foreach (var record in evnt.Records)
{
try
{
Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
string data = await GetRecordDataAsync(record.Kinesis, context);
Logger.LogInformation($"Data: {data}");
// TODO: Do interesting work based on the new data
}
catch (Exception ex)
{
Logger.LogError($"An error occurred {ex.Message}");
throw;
}
}
Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
}
private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
{
byte[] bytes = record.Data.ToArray();
string data = Encoding.UTF8.GetString(bytes);
await Task.CompletedTask; //Placeholder for actual async work
return data;
}
}
- Go
-
- SDK for Go V2
-
GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Go를 사용하여 Lambda로 Kinesis 이벤트를 사용합니다.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"log"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
)
func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {
if len(kinesisEvent.Records) == 0 {
log.Printf("empty Kinesis event received")
return nil
}
for _, record := range kinesisEvent.Records {
log.Printf("processed Kinesis event with EventId: %v", record.EventID)
recordDataBytes := record.Kinesis.Data
recordDataText := string(recordDataBytes)
log.Printf("record data: %v", recordDataText)
// TODO: Do interesting work based on the new data
}
log.Printf("successfully processed %v records", len(kinesisEvent.Records))
return nil
}
func main() {
lambda.Start(handler)
}
- Java
-
- SDK for Java 2.x
-
GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Java를 사용하여 Lambda에서 Kinesis 이벤트를 사용합니다.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package example;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
public class Handler implements RequestHandler<KinesisEvent, Void> {
@Override
public Void handleRequest(final KinesisEvent event, final Context context) {
LambdaLogger logger = context.getLogger();
if (event.getRecords().isEmpty()) {
logger.log("Empty Kinesis Event received");
return null;
}
for (KinesisEvent.KinesisEventRecord record : event.getRecords()) {
try {
logger.log("Processed Event with EventId: "+record.getEventID());
String data = new String(record.getKinesis().getData().array());
logger.log("Data:"+ data);
// TODO: Do interesting work based on the new data
}
catch (Exception ex) {
logger.log("An error occurred:"+ex.getMessage());
throw ex;
}
}
logger.log("Successfully processed:"+event.getRecords().size()+" records");
return null;
}
}
- JavaScript
-
- SDK for JavaScript (v3)
-
GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
JavaScript를 사용하여 Lambda로 Kinesis 이벤트 사용
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
for (const record of event.Records) {
try {
console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
console.log(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {
console.error(`An error occurred ${err}`);
throw err;
}
}
console.log(`Successfully processed ${event.Records.length} records.`);
};
async function getRecordDataAsync(payload) {
var data = Buffer.from(payload.data, "base64").toString("utf-8");
await Promise.resolve(1); //Placeholder for actual async work
return data;
}
TypeScript를 사용하여 Lambda로 Kinesis 이벤트 사용
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
KinesisStreamEvent,
Context,
KinesisStreamHandler,
KinesisStreamRecordPayload,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";
const logger = new Logger({
logLevel: "INFO",
serviceName: "kinesis-stream-handler-sample",
});
export const functionHandler: KinesisStreamHandler = async (
event: KinesisStreamEvent,
context: Context
): Promise<void> => {
for (const record of event.Records) {
try {
logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
logger.info(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {
logger.error(`An error occurred ${err}`);
throw err;
}
logger.info(`Successfully processed ${event.Records.length} records.`);
}
};
async function getRecordDataAsync(
payload: KinesisStreamRecordPayload
): Promise<string> {
var data = Buffer.from(payload.data, "base64").toString("utf-8");
await Promise.resolve(1); //Placeholder for actual async work
return data;
}
- PHP
-
- SDK for PHP
-
GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
PHP를 사용하여 Lambda로 Kinesis 이벤트를 사용합니다.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php
# using bref/bref and bref/logger for simplicity
use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Kinesis\KinesisHandler;
use Bref\Logger\StderrLogger;
require __DIR__ . '/vendor/autoload.php';
class Handler extends KinesisHandler
{
private StderrLogger $logger;
public function __construct(StderrLogger $logger)
{
$this->logger = $logger;
}
/**
* @throws JsonException
* @throws \Bref\Event\InvalidLambdaEvent
*/
public function handleKinesis(KinesisEvent $event, Context $context): void
{
$this->logger->info("Processing records");
$records = $event->getRecords();
foreach ($records as $record) {
$data = $record->getData();
$this->logger->info(json_encode($data));
// TODO: Do interesting work based on the new data
// Any exception thrown will be logged and the invocation will be marked as failed
}
$totalRecords = count($records);
$this->logger->info("Successfully processed $totalRecords records");
}
}
$logger = new StderrLogger();
return new Handler($logger);
- Python
-
- SDK for Python (Boto3)
-
GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Python을 사용하여 Lambda로 Kinesis 이벤트를 사용합니다.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import base64
def lambda_handler(event, context):
for record in event['Records']:
try:
print(f"Processed Kinesis Event - EventID: {record['eventID']}")
record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
print(f"Record Data: {record_data}")
# TODO: Do interesting work based on the new data
except Exception as e:
print(f"An error occurred {e}")
raise e
print(f"Successfully processed {len(event['Records'])} records.")
- Ruby
-
- SDK for Ruby
-
GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Ruby를 사용하여 Lambda로 Kinesis 이벤트를 사용합니다.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'
def lambda_handler(event:, context:)
event['Records'].each do |record|
begin
puts "Processed Kinesis Event - EventID: #{record['eventID']}"
record_data = get_record_data_async(record['kinesis'])
puts "Record Data: #{record_data}"
# TODO: Do interesting work based on the new data
rescue => err
$stderr.puts "An error occurred #{err}"
raise err
end
end
puts "Successfully processed #{event['Records'].length} records."
end
def get_record_data_async(payload)
data = Base64.decode64(payload['data']).force_encoding('UTF-8')
# Placeholder for actual async work
# You can use Ruby's asynchronous programming tools like async/await or fibers here.
return data
end
- Rust
-
- SDK for Rust
-
GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Rust를 사용하여 Lambda로 Kinesis 이벤트를 사용합니다.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::event::kinesis::KinesisEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> {
if event.payload.records.is_empty() {
tracing::info!("No records found. Exiting.");
return Ok(());
}
event.payload.records.iter().for_each(|record| {
tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default());
let record_data = std::str::from_utf8(&record.kinesis.data);
match record_data {
Ok(data) => {
// log the record data
tracing::info!("Data: {}", data);
}
Err(e) => {
tracing::error!("Error: {}", e);
}
}
});
tracing::info!(
"Successfully processed {} records",
event.payload.records.len()
);
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
run(service_fn(function_handler)).await
}
함수를 만들려면
-
프로젝트에 대한 디렉터리를 생성하고 해당 디렉터리로 전환합니다.
mkdir kinesis-tutorial
cd kinesis-tutorial
-
샘플 JavaScript 코드를 새 index.js
파일에 복사합니다.
-
배포 패키지를 만듭니다.
zip function.zip index.js
-
create-function
명령을 사용해 Lambda 함수를 만듭니다.
aws lambda create-function --function-name ProcessKinesisRecords \
--zip-file fileb://function.zip --handler index.handler --runtime nodejs18.x \
--role arn:aws:iam::111122223333
:role/lambda-kinesis-role
Lambda 함수 테스트
invoke
AWS Lambda CLI 명령 및 샘플 Kinesis 이벤트를 사용하여 Lambda 함수를 수동으로 간접 호출합니다.
Lambda 함수를 테스트하려면
-
다음 JSON을 파일에 복사하고 input.txt
로 저장합니다.
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"approximateArrivalTimestamp": 1545084650.987
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream"
}
]
}
-
invoke
명령을 사용하여 함수로 이벤트를 전송합니다.
aws lambda invoke --function-name ProcessKinesisRecords \
--cli-binary-format raw-in-base64-out \
--payload file://input.txt outputfile.txt
cli-binary-format 옵션은 AWS CLI 버전 2를 사용할 때 필요합니다. 이 설정을 기본 설정으로 지정하려면 aws configure set cli-binary-format raw-in-base64-out
을(를) 실행하세요. 자세한 내용은 AWS CLI 지원되는 글로벌 명령줄 옵션을 AWS Command Line Interface 사용 설명서 버전 2에서 참조하세요.
응답이 out.txt
로 저장됩니다.
create-stream
명령을 사용하여 스트림을 만듭니다.
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
다음 describe-stream
명령을 실행하여 스트림 ARN을 가져옵니다.
aws kinesis describe-stream --stream-name lambda-stream
다음 결과가 표시됩니다:
{
"StreamDescription": {
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920746074317682119384634633455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610"
}
}
],
"StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream",
"StreamName": "lambda-stream",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"EncryptionType": "NONE",
"KeyId": null,
"StreamCreationTimestamp": 1544828156.0
}
}
다음 단계에서 스트림 ARN을 사용하여 스트림을 Lambda 함수와 연결합니다.
다음 AWS CLI add-event-source
명령을 실행합니다.
aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \
--batch-size 100 --starting-position LATEST
나중에 사용할 수 있도록 매핑 ID를 기록해 둡니다. list-event-source-mappings
명령을 실행하여 이벤트 소스 매핑 목록을 가져올 수 있습니다.
aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream
응답에서 상태 값이 enabled
인지 확인할 수 있습니다. 이벤트 소스 매핑을 비활성화하여 폴링을 일시적으로 중지시켜 레코드 손실을 방지할 수 있습니다.
이벤트 소스 매핑을 테스트하려면 Kinesis 스트림에 이벤트 레코드를 추가합니다. --data
값은 Kinesis로 전송하기 전에 CLI가 base64로 인코딩하는 문자열입니다. 동일한 명령을 두 번 이상 실행하여 여러 레코드를 스트림에 추가할 수 있습니다.
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \
--data "Hello, this is a test."
Lambda에서는 실행 역할을 사용하여 스트림에서 레코드를 읽습니다. 그런 다음 Lambda 함수를 간접 호출해 레코드 배치를 전달합니다. 함수는 각 레코드에서 데이터를 디코딩하고 로깅해 CloudWatch Logs에 출력을 전송합니다. 로그는 CloudWatch 콘솔에서 확인합니다.
리소스 정리
이 자습서 용도로 생성한 리소스를 보관하고 싶지 않다면 지금 삭제할 수 있습니다. 더 이상 사용하지 않는 AWS 리소스를 삭제하면 AWS 계정에 불필요한 요금이 발생하는 것을 방지할 수 있습니다.
집행 역할 삭제
-
IAM 콘솔에서 역할 페이지를 엽니다.
-
생성한 실행 역할을 선택합니다.
-
Delete(삭제)를 선택합니다.
-
텍스트 입력 필드에 역할의 이름을 입력하고 Delete(삭제)를 선택합니다.
Lambda 함수를 삭제하려면
-
Lambda 콘솔의 함수 페이지를 엽니다.
-
생성한 함수를 선택합니다.
-
작업, 삭제를 선택합니다.
-
텍스트 입력 필드에 delete
를 입력하고 Delete(삭제)를 선택합니다.