Kinesis Data Streams 및 Lambda로 부분 배치 응답 구성
이벤트 소스에서 스트리밍 데이터를 사용하고 처리할 때 기본적으로 Lambda는 배치가 완전히 성공한 경우에만 배치의 가장 높은 시퀀스 번호로 체크포인트를 수행합니다. Lambda는 다른 모든 결과를 완전한 실패로 처리하고 재시도 제한까지 배치 처리를 재시도합니다. 스트림에서 배치를 처리하는 동안 부분적인 성공을 허용하려면 ReportBatchItemFailures
를 설정합니다. 부분적인 성공을 허용하면 레코드에 대한 재시도 횟수를 줄이는 데 도움이 되지만 성공한 레코드의 재시도 가능성을 완전히 막지는 못합니다.
ReportBatchItemFailures
를 켜려면 FunctionResponseTypes 목록에 열거형 값 ReportBatchItemFailures
를 포함시킵니다. 이 목록은 함수에 대해 활성화된 응답 유형을 나타냅니다. 이벤트 소스 매핑을 생성하거나 업데이트할 때 이 목록을 구성할 수 있습니다.
보고서 구문
배치 항목 실패에 대한 보고를 구성할 때 StreamsEventResponse
클래스는 배치 항목 실패 목록과 함께 반환됩니다. StreamsEventResponse
객체를 사용하여 배치에서 첫 번째 실패한 레코드의 시퀀스 번호를 반환할 수 있습니다. 올바른 응답 구문을 사용하여 고유한 사용자 지정 클래스를 생성할 수도 있습니다. 다음 JSON 구조는 필요한 응답 구문을 보여줍니다.
{ "batchItemFailures": [ { "itemIdentifier": "<SequenceNumber>" } ] }
참고
batchItemFailures
어레이에 여러 항목이 포함되어 있으면 Lambda는 시퀀스 번호가 가장 낮은 레코드를 체크포인트로 사용합니다. 그런 다음 Lambda는 해당 체크포인트에서 시작하여 모든 레코드를 다시 시도합니다.
성공 및 실패 조건
Lambda는 다음 중 하나를 반환할 경우 배치를 완전한 성공으로 처리합니다.
-
비어 있는
batchItemFailure
목록 -
null
batchItemFailure
목록 -
비어 있는
EventResponse
-
null
EventResponse
Lambda는 다음 중 하나를 반환할 경우 배치를 완전한 실패로 처리합니다.
-
빈 문자열
itemIdentifier
-
null
itemIdentifier
-
키 이름이 잘못된
itemIdentifier
Lambda는 재시도 전략에 따라 실패를 재시도합니다.
배치 이등분
호출이 실패하고 BisectBatchOnFunctionError
가 활성화되어 있으면 ReportBatchItemFailures
설정에 관계 없이 배치가 이등분됩니다.
부분적 배치 성공 응답이 수신되고 BisectBatchOnFunctionError
및 ReportBatchItemFailures
가 모두 활성화되면 배치가 반환된 시퀀스 번호에서 이등분되고 Lambda는 나머지 레코드만 재시도합니다.
다음은 일괄적으로 실패한 메시지 ID 목록을 반환하는 함수 코드의 몇 가지 예입니다.
- .NET
-
- AWS SDK for .NET
-
참고
GitHub에 더 많은 내용이 있습니다. 서버리스 예제
리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다. .NET을 사용하여 Lambda로 Kinesis 배치 항목 실패 보고
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text; using System.Text.Json.Serialization; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisEvents; using AWS.Lambda.Powertools.Logging; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace KinesisIntegration; public class Function { // Powertools Logger requires an environment variables against your function // POWERTOOLS_SERVICE_NAME [Logging(LogEvent = true)] public async Task<StreamsEventResponse> FunctionHandler(KinesisEvent evnt, ILambdaContext context) { if (evnt.Records.Count == 0) { Logger.LogInformation("Empty Kinesis Event received"); return new StreamsEventResponse(); } foreach (var record in evnt.Records) { try { Logger.LogInformation($"Processed Event with EventId: {record.EventId}"); string data = await GetRecordDataAsync(record.Kinesis, context); Logger.LogInformation($"Data: {data}"); // TODO: Do interesting work based on the new data } catch (Exception ex) { Logger.LogError($"An error occurred {ex.Message}"); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return new StreamsEventResponse { BatchItemFailures = new List<StreamsEventResponse.BatchItemFailure> { new StreamsEventResponse.BatchItemFailure { ItemIdentifier = record.Kinesis.SequenceNumber } } }; } } Logger.LogInformation($"Successfully processed {evnt.Records.Count} records."); return new StreamsEventResponse(); } private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context) { byte[] bytes = record.Data.ToArray(); string data = Encoding.UTF8.GetString(bytes); await Task.CompletedTask; //Placeholder for actual async work return data; } } public class StreamsEventResponse { [JsonPropertyName("batchItemFailures")] public IList<BatchItemFailure> BatchItemFailures { get; set; } public class BatchItemFailure { [JsonPropertyName("itemIdentifier")] public string ItemIdentifier { get; set; } } }
- Go
-
- SDK for Go V2
-
참고
GitHub에 더 많은 내용이 있습니다. 서버리스 예제
리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다. Go를 사용하여 Lambda로 Kinesis 배치 항목 실패를 보고합니다.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, kinesisEvent events.KinesisEvent) (map[string]interface{}, error) { batchItemFailures := []map[string]interface{}{} for _, record := range kinesisEvent.Records { curRecordSequenceNumber := "" // Process your record if /* Your record processing condition here */ { curRecordSequenceNumber = record.Kinesis.SequenceNumber } // Add a condition to check if the record processing failed if curRecordSequenceNumber != "" { batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": curRecordSequenceNumber}) } } kinesisBatchResponse := map[string]interface{}{ "batchItemFailures": batchItemFailures, } return kinesisBatchResponse, nil } func main() { lambda.Start(handler) }
- Java
-
- SDK for Java 2.x
-
참고
GitHub에 더 많은 내용이 있습니다. 서버리스 예제
리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다. Java를 사용하여 Lambda로 Kinesis 배치 항목 실패 보고.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, StreamsEventResponse> { @Override public StreamsEventResponse handleRequest(KinesisEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>(); String curRecordSequenceNumber = ""; for (KinesisEvent.KinesisEventRecord kinesisEventRecord : input.getRecords()) { try { //Process your record KinesisEvent.Record kinesisRecord = kinesisEventRecord.getKinesis(); curRecordSequenceNumber = kinesisRecord.getSequenceNumber(); } catch (Exception e) { /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(batchItemFailures); } }
- JavaScript
-
- SDK for JavaScript (v3)
-
참고
GitHub에 더 많은 내용이 있습니다. 서버리스 예제
리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다. Javascript를 사용하여 Lambda로 Kinesis 배치 항목 실패 보고
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 exports.handler = async (event, context) => { for (const record of event.Records) { try { console.log(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); console.log(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { console.error(`An error occurred ${err}`); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return { batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }], }; } } console.log(`Successfully processed ${event.Records.length} records.`); return { batchItemFailures: [] }; }; async function getRecordDataAsync(payload) { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }
TypeScript를 사용하여 Lambda로 Kinesis 배치 항목 실패 보고
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { KinesisStreamEvent, Context, KinesisStreamHandler, KinesisStreamRecordPayload, KinesisStreamBatchResponse, } from "aws-lambda"; import { Buffer } from "buffer"; import { Logger } from "@aws-lambda-powertools/logger"; const logger = new Logger({ logLevel: "INFO", serviceName: "kinesis-stream-handler-sample", }); export const functionHandler: KinesisStreamHandler = async ( event: KinesisStreamEvent, context: Context ): Promise<KinesisStreamBatchResponse> => { for (const record of event.Records) { try { logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); logger.info(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { logger.error(`An error occurred ${err}`); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return { batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }], }; } } logger.info(`Successfully processed ${event.Records.length} records.`); return { batchItemFailures: [] }; }; async function getRecordDataAsync( payload: KinesisStreamRecordPayload ): Promise<string> { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }
- PHP
-
- SDK for PHP
-
참고
GitHub에 더 많은 내용이 있습니다. 서버리스 예제
리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다. PHP를 사용하여 Lambda로 Kinesis 배치 항목 실패를 보고합니다.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 <?php # using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\Kinesis\KinesisEvent; use Bref\Event\Handler as StdHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler implements StdHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handle(mixed $event, Context $context): array { $kinesisEvent = new KinesisEvent($event); $this->logger->info("Processing records"); $records = $kinesisEvent->getRecords(); $failedRecords = []; foreach ($records as $record) { try { $data = $record->getData(); $this->logger->info(json_encode($data)); // TODO: Do interesting work based on the new data } catch (Exception $e) { $this->logger->error($e->getMessage()); // failed processing the record $failedRecords[] = $record->getSequenceNumber(); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); // change format for the response $failures = array_map( fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber], $failedRecords ); return [ 'batchItemFailures' => $failures ]; } } $logger = new StderrLogger(); return new Handler($logger);
- Python
-
- SDK for Python (Boto3)
-
참고
GitHub에 더 많은 내용이 있습니다. 서버리스 예제
리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다. Python을 사용하여 Lambda로 Kinesis 배치 항목 실패 보고.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def handler(event, context): records = event.get("Records") curRecordSequenceNumber = "" for record in records: try: # Process your record curRecordSequenceNumber = record["kinesis"]["sequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}
- Ruby
-
- SDK for Ruby
-
참고
GitHub에 더 많은 내용이 있습니다. 서버리스 예제
리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다. Ruby를 사용하여 Lambda로 Kinesis 배치 항목 실패를 보고합니다.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 require 'aws-sdk' def lambda_handler(event:, context:) batch_item_failures = [] event['Records'].each do |record| begin puts "Processed Kinesis Event - EventID: #{record['eventID']}" record_data = get_record_data_async(record['kinesis']) puts "Record Data: #{record_data}" # TODO: Do interesting work based on the new data rescue StandardError => err puts "An error occurred #{err}" # Since we are working with streams, we can return the failed item immediately. # Lambda will immediately begin to retry processing from this failed item onwards. return { batchItemFailures: [{ itemIdentifier: record['kinesis']['sequenceNumber'] }] } end end puts "Successfully processed #{event['Records'].length} records." { batchItemFailures: batch_item_failures } end def get_record_data_async(payload) data = Base64.decode64(payload['data']).force_encoding('utf-8') # Placeholder for actual async work sleep(1) data end
- Rust
-
- SDK for Rust
-
참고
GitHub에 더 많은 내용이 있습니다. 서버리스 예제
리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다. Rust를 사용하여 Lambda로 Kinesis 배치 항목 실패를 보고합니다.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::kinesis::KinesisEvent, kinesis::KinesisEventRecord, streams::{KinesisBatchItemFailure, KinesisEventResponse}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<KinesisEventResponse, Error> { let mut response = KinesisEventResponse { batch_item_failures: vec![], }; if event.payload.records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(response); } for record in &event.payload.records { tracing::info!( "EventId: {}", record.event_id.as_deref().unwrap_or_default() ); let record_processing_result = process_record(record); if record_processing_result.is_err() { response.batch_item_failures.push(KinesisBatchItemFailure { item_identifier: record.kinesis.sequence_number.clone(), }); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return Ok(response); } } tracing::info!( "Successfully processed {} records", event.payload.records.len() ); Ok(response) } fn process_record(record: &KinesisEventRecord) -> Result<(), Error> { let record_data = std::str::from_utf8(record.kinesis.data.as_slice()); if let Some(err) = record_data.err() { tracing::error!("Error: {}", err); return Err(Error::from(err)); } let record_data = record_data.unwrap_or_default(); // do something interesting with the data tracing::info!("Data: {}", record_data); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }