Lambda 함수를 사용하여 데이터 사전 처리 - Amazon Kinesis Data Analytics for SQL Applications 개발자 안내서

신중한 고려 후 다음 두 단계로 Amazon Kinesis Data Analytics for SQL applications를 중단하기로 결정했습니다.

1. 2025년 10월 15일부터 SQL 애플리케이션을 위한 새 Kinesis Data Analytics를 생성할 수 없습니다.

2. 2026년 1월 27일부터 애플리케이션이 삭제됩니다. SQL 애플리케이션용 Amazon Kinesis Data Analytics를 시작하거나 운영할 수 없습니다. SQL 해당 시점부터에 대한 Amazon Kinesis Data Analytics에 대한 지원을 더 이상 사용할 수 없습니다. 자세한 내용은 Amazon Kinesis Data Analytics for SQL Applications 중단 단원을 참조하십시오.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Lambda 함수를 사용하여 데이터 사전 처리

참고

2023년 9월 12일 이후에는 Kinesis Data Analytics for SQL의 기존 사용자가 아닌 경우, Kinesis Data Firehose를 소스로 사용하여 새 애플리케이션을 생성할 수 없습니다. 자세한 설명은 제한을 참조하십시오.

스트림의 데이터에 대해 형식 변환, 전환, 강화 또는 필터링이 필요한 경우 AWS Lambda 함수를 사용하여 데이터를 사전 처리할 수 있습니다. 애플리케이션 SQL 코드가 실행되기 전에 또는 애플리케이션이 데이터 스트림에서 스키마를 생성하기 전에 이 작업을 수행할 수 있습니다.

Lambda 함수를 사용하여 레코드를 사전 처리하면 다음과 같은 경우에 유용합니다:

  • 다른 형식 (예: KPL 또는 GZIP)의 레코드를 Kinesis Data Analytics가 분석할 수 있는 형식으로 변환. Kinesis Data Analytics는 현재 JSON 또는 CSV 데이터 형식을 지원합니다.

  • 집계, 변칙 검색 등과 같은 작업을 위해 액세스하기 쉬운 형식으로 데이터 확장. 예를 들어, 여러 데이터 값을 하나의 문자열에 함께 저장할 경우 데이터를 별도 열로 확장할 수 있습니다.

  • 다른 Amazon 서비스 (예: 외삽, 오류 수정 등)의 데이터 강화.

  • 레코드 필드에 복잡한 문자열 변환 적용

  • 데이터 정리를 위해 데이터 필터링

Lambda 함수를 사용하여 레코드 사전 처리

Kinesis Data Analytics 애플리케이션을 생성할 때 소스에 연결 페이지에서 Lambda 사전 처리를 활성화합니다.

Kinesis Data Analytics 애플리케이션에서 Lambda 함수를 사용하여 레코드를 사전 처리하려면
  1. AWS Management Console에 로그인하고 https://console.aws.amazon.com/kinesisanalytics에서 Managed Service for Apache Flink 콘솔을 엽니다.

  2. 애플리케이션의 소스에 연결 페이지의 사전 처리 기록 AWS Lambda 항목 섹션에서 활성화를 선택합니다.

  3. 이미 생성된 Lambda 함수를 사용하려면 Lambda 함수 드롭다운 목록에서 함수를 선택합니다.

  4. Lambda 사전 처리 템플릿 중 하나에서 새 Lambda 함수를 생성하려면 드롭다운 목록에서 해당 템플릿을 선택합니다. 그런 다음 View <template name> in Lambda(Lambda에서 <템플릿 명칭> 보기)를 선택하여 함수를 편집합니다.

  5. 새 Lambda 함수를 생성하려면 새로운 생성을 선택합니다. Lambda 함수 생성에 관한 정보는 AWS Lambda개발자 가이드HelloWorld Lambda 함수 생성 및 콘솔 탐색을 참조하십시오.

  6. 사용할 Lambda 함수의 버전을 선택합니다. 최신 버전을 사용하려면 [$LATEST]를 선택합니다.

레코드 사전 처리를 위해 Lambda 함수를 선택하거나 생성할 경우 애플리케이션 SQL 코드가 실행되거나 애플리케이션에서 레코드로부터 스키마를 생성하기 이전에 레코드가 사전 처리됩니다.

Lambda 사전 처리 권한

Lambda 사전 처리를 사용하려면 애플리케이션의 IAM 역할에 다음과 같은 권한 정책이 필요합니다.

{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }

Lambda 사전 처리 지표

Amazon CloudWatch를 사용하여 Lambda 간접 호출 수, 처리된 바이트 수, 성공 및 실패 등을 모니터링할 수 있습니다. Kinesis Data Analytics Lambda 사전 처리에서 발생되는 CloudWatch 지표에 대한 자세한 설명은 Amazon Kinesis Analytics 지표를 참조하십시오.

Kinesis Producer Library와 함께 AWS Lambda 사용

Kinesis Producer Library(KPL)는 사용자 형식의 작은 레코드를 최대 1MB의 큰 레코드로 집계하여 Amazon Kinesis Data Streams 처리량을 효율적으로 활용할 수 있게 합니다. Java용 Kinesis Client Library(KCL)은 이러한 레코드의 분해를 지원합니다. 그러나 AWS Lambda을 스트림 소비자로 사용할 경우 특수 모듈을 사용하여 레코드를 분해해야 합니다.

필요한 프로젝트 코드와 지침을 얻으려면 GitHub에서 AWS Lambda를 위한 Kinesis Producer Library Deaggregation 모듈을 참조하십시오. 이 구성 요소를 프로젝트에 Java, Node.js 및 Python으로 AWS Lambda에서 직렬화된 KPL 데이터를 처리할 수 있습니다. 복수의 언어 KCL 애플리케이션의 일부로 이러한 구성 요소를 사용할 수 있습니다.

데이터 사전 처리 이벤트 입력 데이터 모델/레코드 응답 모델

레코드를 사전 처리하려면 Lambda 함수가 필수 이벤트 입력 데이터 및 레코드 응답 모델을 준수해야 합니다.

이벤트 입력 데이터 모델

Kinesis Data Analytics는 Kinesis 데이터 스트림 또는 Firehose 전송 스트림에서 데이터를 연속적으로 읽습니다. 추출하는 각각의 배치 레코드의 경우 서비스가 Lambda 함수에 전달되는 방법을 관리합니다. 함수에서는 레코드 목록을 입력으로 수신합니다. 함수 내에서 목록을 반복하고 비즈니스 로직을 적용하여 사전 처리 요구 사항(예: 데이터 포맷 변환 또는 강화)을 충족합니다.

사전 처리 함수에 대한 입력 모델은 데이터가 Kinesis 데이터 스트림에서 수신되었는지 Firehose 전송 스트림에서 수신되었는지에 따라 조금씩 달라집니다.

소스가 Firehose 전송 스트림인 경우 이벤트 입력 데이터 모델은 다음과 같습니다.

Kinesis Data Firehose 요청 데이터 모델

필드 설명
invocationId Lambda 간접 호출 ID(임의의 GUID)
applicationArn Kinesis Data Analytics 애플리케이션 Amazon 리소스 이름(ARN)
streamArn 전송 스트림 ARN
레코드
필드 설명
recordId 레코드 ID(임의 GUID)
kinesisFirehoseRecordMetadata
필드 설명
approximateArrivalTimestamp 전송 스트림 레코드의 대략적인 도착 시간
data Base64 인코딩된 소스 레코드 페이로드

다음 예는 Firehose 전송 스트림의 입력을 보여 줍니다.

{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }

소스가 Kinesis 데이터 스트림인 경우 이벤트 입력 데이터 모델은 다음과 같습니다:

Kinesis 스트림 요청 데이터 모델

필드 설명
invocationId Lambda 간접 호출 ID(임의의 GUID)
applicationArn Kinesis Data Analytics 애플리케이션 ARN
streamArn 전송 스트림 ARN
레코드
필드 설명
recordId Kinesis 레코드 시퀀스 번호를 기반으로 하는 레코드 ID
kinesisStreamRecordMetadata
필드 설명
sequenceNumber Kinesis 스트림 레코드의 시퀀스 번호
partitionKey Kinesis 스트림 레코드의 파티션 키
shardId ShardId Kinesis 스트림 레코드의
approximateArrivalTimestamp 전송 스트림 레코드의 대략적인 도착 시간
data Base64 인코딩된 소스 레코드 페이로드

다음 예는 Kinesis 데이터 스트림의 입력을 보여 줍니다.

{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }

레코드 응답 모델

Lambda 함수에 전송되는 Lambda 사전 처리 함수(레코드 ID 포함)에서 반환된 모든 레코드가 반환되어야 합니다. 이러한 레코드는 다음 파라미터가 포함되어 있어야 하며 그렇지 않으면 Kinesis Data Analytics가 이를 거부하고 데이터 사전 처리 실패로 간주합니다. 레코드의 데이터 페이로드 부분을 사전 처리 요구 사항에 맞게 변환할 수 있습니다.

응답 데이터 모델

레코드
필드 설명
recordId 레코드 ID는 간접 호출 중에 Kinesis Data Analytics에서 Lambda로 전달됩니다. 변환된 레코드에는 동일한 레코드 ID가 포함되어야 합니다. 원래 레코드의 ID와 변환된 레코드의 ID 간 불일치는 데이터 사전 처리 실패로 간주됩니다.
result 레코드의 데이터 변환 상태입니다. 가능한 값은 다음과 같습니다.
  • Ok: 레코드가 성공적으로 변환되었습니다. Kinesis Data Analytics는 SQL 처리를 위해 레코드를 수집합니다.

  • Dropped: 처리 로직에 의해 레코드가 의도적으로 삭제되었습니다. Kinesis Data Analytics는 SQL 처리에서 레코드를 삭제합니다. Dropped 레코드에 대해 데이터 페이로드 필드는 선택 사항입니다.

  • ProcessingFailed: 레코드를 변환할 수 없습니다. Kinesis Data Analytics는 Lambda 함수가 이를 성공적으로 처리하지 못한 것으로 간주하고 오류 스트림에 오류를 기록합니다. 오류 스트림에 대한 자세한 설명은 오류 처리 섹션을 참조하십시오. ProcessingFailed 레코드에 대해 데이터 페이로드 필드는 선택 사항입니다.

data base64 인코딩 후 변환된 데이터 페이로드입니다. 각 데이터 페이로드는 애플리케이션 수집 데이터 형식이 JSON인 경우 여러 JSON 문서를 포함할 수 있습니다. 또는 애플리케이션 수집 데이터 형식이 CSV인 경우 여러 CSV 행(각 행에서 지정된 행 구분 기호 사용)을 포함할 수 있습니다. Kinesis Data Analytics 서비스가 여러 JSON 문서 또는 동일한 데이터 페이로드 내 CSV 행에서 데이터를 구문 분석하고 처리합니다.

다음 예는 Lambda 함수의 출력을 보여 줍니다.

{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }

일반 데이터 사전 처리 기능

다음은 사전 처리에 실패할 수 있는 일반적인 이유입니다.

  • Lambda 함수에 전송되는 배치 레코드 (레코드 ID 포함) 전체가 서비스에 반환 되지는 않습니다.

  • 응답에 레코드 ID, 상태 또는 데이터 페이로드 필드가 없습니다. Dropped 또는 ProcessingFailed 레코드에 대해 데이터 페이로드 필드는 선택 사항입니다.

  • Lambda 함수 제한 시간이 데이터를 사전 처리하는 데 충분하지 않습니다.

  • Lambda 함수 응답이 AWS Lambda 서비스에서 부여한 응답 한도를 초과합니다.

데이터 사전 처리에 실패한 경우 Kinesis Data Analytics는 동일한 레코드 세트에 대해 호출을 성공할 때까지 계속 재시도합니다. 다음 CloudWatch 지표를 모니터링하면 실패의 원인을 알 수 있습니다.

  • Kinesis Data Analytics 애플리케이션 MillisBehindLatest: 애플리케이션이 스트리밍 소스에서 읽어오는 시간이 얼마나 뒤처져 있는 지를 나타냅니다.

  • Kinesis Data Analytics 애플리케이션 InputPreprocessing CloudWatch 지표: 통계 중에서 성공 횟수와 실패 횟수를 나타냅니다. 자세한 설명은 Amazon Kinesis Analytics 지표를 참조하십시오.

  • AWS Lambda 함수 CloudWatch 지표 및 로그.