Lambda에서 SQS 이벤트 소스에 대한 오류 처리 - AWS Lambda

Lambda에서 SQS 이벤트 소스에 대한 오류 처리

SQS 이벤트 소스와 관련된 오류를 처리하기 위해 Lambda는 백오프 전략과 함께 재시도 전략을 자동으로 사용합니다. 부분 배치 응답을 반환하도록 SQS 이벤트 소스 매핑을 구성하여 오류 처리 동작을 사용자 지정할 수도 있습니다.

실패한 호출에 대한 백오프 전략

호출이 실패하면 Lambda는 백오프 전략을 구현하면서 호출을 재시도합니다. 백오프 전략은 Lambda가 함수 코드의 오류로 인해 장애를 겪었는지 아니면 제한으로 인해 장애가 발생했는지에 따라 약간 다릅니다.

  • 함수 코드로 인해 오류가 발생한 경우 Lambda는 간접 호출 처리 및 재시도를 중지합니다. 그 동안 Lambda는 점진적으로 백오프를 수행하여 Amazon SQS 이벤트 소스 매핑에 할당되는 동시성의 양을 줄입니다. 대기열의 표시 제한 시간이 다 지나면 메시지가 대기열에 다시 나타납니다.

  • 제한으로 인해 호출이 실패하는 경우 Lambda는 Amazon SQS 이벤트 소스 매핑에 할당되는 동시성의 양을 줄여 재시도를 점진적으로 백오프합니다. Lambda는 메시지의 타임스탬프가 대기열의 가시성 제한 시간을 초과할 때(이때 Lambda가 메시지를 삭제)까지 메시지를 계속 재시도합니다.

부분 일괄 응답 구현

배치를 처리하는 동안 Lambda 함수에 오류가 발생하면 Lambda가 성공적으로 처리한 메시지를 포함하여 해당 배치의 모든 메시지가 기본적으로 대기열에 다시 표시됩니다. 따라서 함수가 동일한 메시지를 여러 번 처리할 수 있습니다.

실패한 배치에서 정상 처리된 메시지를 재처리하지 않으려면 실패한 메시지만 다시 표시하도록 이벤트 소스 매핑을 구성할 수 있습니다. 이를 부분 일괄 응답이라고 합니다. 부분 일괄 응답을 켜려면 이벤트 소스 매핑을 구성할 때 FunctionResponseTypes 작업에 대해 ReportBatchItemFailures을 지정하십시오. 그러면 함수가 부분적인 성공을 반환할 수 있으므로 레코드에 대한 불필요한 재시도 횟수를 줄일 수 있습니다.

ReportBatchItemFailures이 활성화되면 Lambda는 함수 호출이 실패할 때 메시지 폴링을 축소하지 않습니다. 일부 메시지에 오류가 발생할 것으로 예상되고 이러한 오류가 메시지 처리 속도에 영향을 미치지 않도록 하려면 ReportBatchItemFailures를 사용하십시오.

참고

부분 일괄 응답을 사용할 때는 다음 사항에 유의하세요.

  • 함수에서 예외가 발생한 경우 전체 배치가 완전한 실패로 간주됩니다.

  • FIFO 대기열과 함께 이 기능을 사용하는 경우 함수는 첫 번째 실패 후 메시지 처리를 중지하고 batchItemFailures에서 모든 실패한 메시지와 처리되지 않은 메시지를 반환해야 합니다. 그러면 대기열의 메시지 순서를 유지할 수 있습니다.

부분 배치 보고를 활성화 방법
  1. 부분 일괄 응답 구현에 대한 모범 사례를 검토하세요.

  2. 다음 명령을 실행하여 함수의 ReportBatchItemFailures을 활성화합니다. 이벤트 소스 매핑의 UUID를 가져오려면 list-event-source-mappings AWS CLI 명령을 실행합니다.

    aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --function-response-types "ReportBatchItemFailures"
  3. 함수 코드를 업데이트하여 모든 예외를 포착하고 실패한 메시지를 batchItemFailures JSON 응답으로 반환하세요. batchItemFailures 응답에는 메시지 ID 목록이 itemIdentifier JSON 값으로 포함되어야 합니다.

    예를 들어 메시지 ID가 id1, id2, id3, id4, id5인 5개의 메시지로 구성된 배치가 있다고 가정합니다. 함수가 id1, id3, id5를 성공적으로 처리합니다. id2id4 메시지를 대기열에서 다시 볼 수 있도록 하려면 함수가 다음 응답을 리턴해야 합니다.

    { "batchItemFailures": [ { "itemIdentifier": "id2" }, { "itemIdentifier": "id4" } ] }

    다음은 일괄적으로 실패한 메시지 ID 목록을 반환하는 함수 코드의 몇 가지 예입니다.

    .NET
    AWS SDK for .NET
    참고

    GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

    .NET을 사용하여 Lambda로 SQS 배치 항목 실패 보고

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using Amazon.Lambda.Core; using Amazon.Lambda.SQSEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace sqsSample; public class Function { public async Task<SQSBatchResponse> FunctionHandler(SQSEvent evnt, ILambdaContext context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new List<SQSBatchResponse.BatchItemFailure>(); foreach(var message in evnt.Records) { try { //process your message await ProcessMessageAsync(message, context); } catch (System.Exception) { //Add failed message identifier to the batchItemFailures list batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure{ItemIdentifier=message.MessageId}); } } return new SQSBatchResponse(batchItemFailures); } private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context) { if (String.IsNullOrEmpty(message.Body)) { throw new Exception("No Body in SQS Message."); } context.Logger.LogInformation($"Processed message {message.Body}"); // TODO: Do interesting work based on the new message await Task.CompletedTask; } }
    Go
    SDK for Go V2
    참고

    GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

    Go를 사용하여 Lambda로 SQS 배치 항목 실패 보고

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "encoding/json" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, sqsEvent events.SQSEvent) (map[string]interface{}, error) { batchItemFailures := []map[string]interface{}{} for _, message := range sqsEvent.Records { if /* Your message processing condition here */ { batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": message.MessageId}) } } sqsBatchResponse := map[string]interface{}{ "batchItemFailures": batchItemFailures, } return sqsBatchResponse, nil } func main() { lambda.Start(handler) }
    Java
    SDK for Java 2.x
    참고

    GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

    Java를 사용하여 Lambda로 SQS 배치 항목 실패 보고

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import java.util.ArrayList; import java.util.List; public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> { @Override public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>(); String messageId = ""; for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) { try { //process your message messageId = message.getMessageId(); } catch (Exception e) { //Add failed message identifier to the batchItemFailures list batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(messageId)); } } return new SQSBatchResponse(batchItemFailures); } }
    JavaScript
    SDK for JavaScript (v3)
    참고

    GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

    JavaScript를 사용하여 Lambda에서 SQS 배치 항목 실패를 보고합니다.

    // Node.js 20.x Lambda runtime, AWS SDK for Javascript V3 export const handler = async (event, context) => { const batchItemFailures = []; for (const record of event.Records) { try { await processMessageAsync(record, context); } catch (error) { batchItemFailures.push({ itemIdentifier: record.messageId }); } } return { batchItemFailures }; }; async function processMessageAsync(record, context) { if (record.body && record.body.includes("error")) { throw new Error("There is an error in the SQS Message."); } console.log(`Processed message: ${record.body}`); }

    TypeScript를 사용하여 Lambda로 SQS 배치 항목 실패를 보고합니다.

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { SQSEvent, SQSBatchResponse, Context, SQSBatchItemFailure, SQSRecord } from 'aws-lambda'; export const handler = async (event: SQSEvent, context: Context): Promise<SQSBatchResponse> => { const batchItemFailures: SQSBatchItemFailure[] = []; for (const record of event.Records) { try { await processMessageAsync(record); } catch (error) { batchItemFailures.push({ itemIdentifier: record.messageId }); } } return {batchItemFailures: batchItemFailures}; }; async function processMessageAsync(record: SQSRecord): Promise<void> { if (record.body && record.body.includes("error")) { throw new Error('There is an error in the SQS Message.'); } console.log(`Processed message ${record.body}`); }
    PHP
    SDK for PHP
    참고

    GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

    PHP를 사용하여 Lambda로 SQS 배치 항목 실패 보고

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 <?php use Bref\Context\Context; use Bref\Event\Sqs\SqsEvent; use Bref\Event\Sqs\SqsHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler extends SqsHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handleSqs(SqsEvent $event, Context $context): void { $this->logger->info("Processing SQS records"); $records = $event->getRecords(); foreach ($records as $record) { try { // Assuming the SQS message is in JSON format $message = json_decode($record->getBody(), true); $this->logger->info(json_encode($message)); // TODO: Implement your custom processing logic here } catch (Exception $e) { $this->logger->error($e->getMessage()); // failed processing the record $this->markAsFailed($record); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords SQS records"); } } $logger = new StderrLogger(); return new Handler($logger);
    Python
    SDK for Python (Boto3)
    참고

    GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

    Python을 사용하여 Lambda로 SQS 배치 항목 실패 보고

    # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event, context): if event: batch_item_failures = [] sqs_batch_response = {} for record in event["Records"]: try: # process message except Exception as e: batch_item_failures.append({"itemIdentifier": record['messageId']}) sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response
    Ruby
    SDK for Ruby
    참고

    GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

    Ruby를 사용하여 Lambda로 SQS 배치 항목 실패를 보고합니다.

    # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 require 'json' def lambda_handler(event:, context:) if event batch_item_failures = [] sqs_batch_response = {} event["Records"].each do |record| begin # process message rescue StandardError => e batch_item_failures << {"itemIdentifier" => record['messageId']} end end sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response end end
    Rust
    SDK for Rust
    참고

    GitHub에 더 많은 내용이 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

    Rust를 사용하여 Lambda로 SQS 배치 항목 실패를 보고합니다.

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::sqs::{SqsBatchResponse, SqsEvent}, sqs::{BatchItemFailure, SqsMessage}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn process_record(_: &SqsMessage) -> Result<(), Error> { Err(Error::from("Error processing message")) } async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<SqsBatchResponse, Error> { let mut batch_item_failures = Vec::new(); for record in event.payload.records { match process_record(&record).await { Ok(_) => (), Err(_) => batch_item_failures.push(BatchItemFailure { item_identifier: record.message_id.unwrap(), }), } } Ok(SqsBatchResponse { batch_item_failures, }) } #[tokio::main] async fn main() -> Result<(), Error> { run(service_fn(function_handler)).await }

실패한 이벤트가 대기열로 반환되지 않는 경우 AWS 지식 센터에서 Lambda 함수 SQS ReportBatchItemFailure의 문제를 해결하려면 어떻게 해야 합니까?를 참조하십시오.

성공 및 실패 조건

Lambda는 함수가 다음 중 하나를 반환할 경우 배치를 완전한 성공으로 처리합니다.

  • 비어 있는 batchItemFailures 목록

  • null batchItemFailures 목록

  • 비어 있는 EventResponse

  • null EventResponse

Lambda는 함수가 다음 중 하나를 반환할 경우 배치를 완전한 실패로 처리합니다.

  • 잘못된 JSON 응답

  • 빈 문자열 itemIdentifier

  • null itemIdentifier

  • 키 이름이 잘못된 itemIdentifier

  • 존재하지 않는 메시지 ID가 있는 itemIdentifier

CloudWatch 지표

함수가 배치 항목 실패를 올바르게 보고하는지 확인하려면 Amazon CloudWatch에서 NumberOfMessagesDeletedApproximateAgeOfOldestMessage Amazon SQS 지표를 모니터링하면 됩니다.

  • NumberOfMessagesDeleted는 대기열에서 제거된 메시지 수를 추적합니다. 이 값이 0으로 떨어지면 함수 응답이 실패한 메시지를 올바르게 반환하지 않는다는 신호입니다.

  • ApproximateAgeOfOldestMessage는 가장 오래된 메시지가 대기열에 머물렀던 시간을 추적합니다. 이 지표가 급격히 증가하면 함수가 실패한 메시지를 올바르게 반환하지 않음을 나타낼 수 있습니다.