在 Lambda 中處理 SQS 事件來源的錯誤 - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

在 Lambda 中處理 SQS 事件來源的錯誤

為了處理與 SQS 事件來源相關的錯誤,Lambda 會自動使用重試策略搭配退避策略。您也可以透過設定 SQS 事件來源映射,來自訂錯誤處理行為,以傳回部分批次回應

失敗調用的輪詢策略

當調用失敗,Lambda 會在實作輪詢策略時嘗試重試調用。根據 Lambda 是否因函數程式碼錯誤或限流而發生失敗,輪詢策略會略有不同。

  • 如果是函數程式碼導致錯誤,則 Lambda 會停止處理並重試調用。同時,Lambda 會逐漸退避,減少分配給 Amazon SQS 事件來源映射的並行數量。在佇列的可見性逾時時間到了之後,訊息會再次重新出現在佇列中。

  • 如果是限流導致調用失敗,Lambda 會減少分配給 Amazon SQS 事件來源映射的並行數量,逐漸重試輪詢。Lambda 會持續重試訊息,直到訊息的時間戳記超過佇列的可見性逾時為止,此時 Lambda 會捨棄訊息。

實作部分批次回應

當 Lambda 函數在處理批次時遇到錯誤,根據預設,該批次中的所有訊息會再次顯示在佇列中,包含 Lambda 已順利處理的訊息。因此,您的函數可能最後會處理數次相同的訊息。

若要避免重新處理失敗批次中成功處理過的訊息,您可以設定事件來源映射,僅讓失敗的訊息再次可見。我們將其稱為部分批次回應。若要開啟部分批次回應,請在設定事件來源映射時為 FunctionResponseTypes 動作指定 ReportBatchItemFailures。這可以讓您的函數傳回部分成功,有助於減少記錄上不必要的重試次數。

啟動 ReportBatchItemFailures 時,Lambda 不會在函數調用失敗時縮減訊息輪詢的規模。如果您預期部分訊息會失敗,且不希望這些失敗影響到訊息的處理速度,則請使用 ReportBatchItemFailures

注意

使用部分批次回應時,請注意下列事項:

  • 如果函數擲出例外情況,便會將整個批次視為完全失敗。

  • 如果您將此功能與 FIFO 佇列一起使用,您的函數應該在第一次失敗後停止處理訊息,並傳回 batchItemFailures 中所有失敗與尚未處理的訊息。這有助於保留佇列中訊息的順序。

若要啟動部分批次報告
  1. 檢閱實作部分批次回應的最佳實務

  2. 執行以下命令以便為函數啟用 ReportBatchItemFailures。若要擷取事件來源映射的 UUID,請執行 list-event-source-mappings AWS CLI 命令。

    aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --function-response-types "ReportBatchItemFailures"
  3. 更新函數程式碼以擷取所有例外狀況,並傳回 batchItemFailures JSON 回應中的失敗訊息。batchItemFailures 回應必須含有以 itemIdentifier JSON 值表示的訊息 ID 清單。

    例如,假設您有五則訊息的批次,其中,訊息 ID 分別為 id1id2id3id4,以及 id5。您的函數已成功處理 id1id3,以及 id5。若要讓訊息 id2id4 再次於佇列中可見,您的函數應傳回以下回應:

    { "batchItemFailures": [ { "itemIdentifier": "id2" }, { "itemIdentifier": "id4" } ] }

    以下範例函數程式碼會傳回批次中失敗訊息 ID 的清單:

    .NET
    AWS SDK for .NET
    注意

    GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

    使用 .NET 搭配 Lambda 報告 SQS 批次項目失敗。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using Amazon.Lambda.Core; using Amazon.Lambda.SQSEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace sqsSample; public class Function { public async Task<SQSBatchResponse> FunctionHandler(SQSEvent evnt, ILambdaContext context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new List<SQSBatchResponse.BatchItemFailure>(); foreach(var message in evnt.Records) { try { //process your message await ProcessMessageAsync(message, context); } catch (System.Exception) { //Add failed message identifier to the batchItemFailures list batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure{ItemIdentifier=message.MessageId}); } } return new SQSBatchResponse(batchItemFailures); } private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context) { if (String.IsNullOrEmpty(message.Body)) { throw new Exception("No Body in SQS Message."); } context.Logger.LogInformation($"Processed message {message.Body}"); // TODO: Do interesting work based on the new message await Task.CompletedTask; } }
    Go
    SDK for Go V2
    注意

    GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

    使用 Go 搭配 Lambda 報告 SQS 批次項目失敗。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "encoding/json" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, sqsEvent events.SQSEvent) (map[string]interface{}, error) { batchItemFailures := []map[string]interface{}{} for _, message := range sqsEvent.Records { if /* Your message processing condition here */ { batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": message.MessageId}) } } sqsBatchResponse := map[string]interface{}{ "batchItemFailures": batchItemFailures, } return sqsBatchResponse, nil } func main() { lambda.Start(handler) }
    Java
    SDK for Java 2.x
    注意

    GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

    使用 Java 搭配 Lambda 報告 SQS 批次項目失敗。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import java.util.ArrayList; import java.util.List; public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> { @Override public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>(); String messageId = ""; for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) { try { //process your message messageId = message.getMessageId(); } catch (Exception e) { //Add failed message identifier to the batchItemFailures list batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(messageId)); } } return new SQSBatchResponse(batchItemFailures); } }
    JavaScript
    SDK for JavaScript (v3)
    注意

    GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

    使用 JavaScript 搭配 Lambda 報告 SQS 批次項目失敗。

    // Node.js 20.x Lambda runtime, AWS SDK for Javascript V3 export const handler = async (event, context) => { const batchItemFailures = []; for (const record of event.Records) { try { await processMessageAsync(record, context); } catch (error) { batchItemFailures.push({ itemIdentifier: record.messageId }); } } return { batchItemFailures }; }; async function processMessageAsync(record, context) { if (record.body && record.body.includes("error")) { throw new Error("There is an error in the SQS Message."); } console.log(`Processed message: ${record.body}`); }

    使用 TypeScript 搭配 Lambda 報告 SQS 批次項目失敗。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { SQSEvent, SQSBatchResponse, Context, SQSBatchItemFailure, SQSRecord } from 'aws-lambda'; export const handler = async (event: SQSEvent, context: Context): Promise<SQSBatchResponse> => { const batchItemFailures: SQSBatchItemFailure[] = []; for (const record of event.Records) { try { await processMessageAsync(record); } catch (error) { batchItemFailures.push({ itemIdentifier: record.messageId }); } } return {batchItemFailures: batchItemFailures}; }; async function processMessageAsync(record: SQSRecord): Promise<void> { if (record.body && record.body.includes("error")) { throw new Error('There is an error in the SQS Message.'); } console.log(`Processed message ${record.body}`); }
    PHP
    SDK for PHP
    注意

    GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

    使用 PHP 搭配 Lambda 報告 SQS 批次項目失敗。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 <?php use Bref\Context\Context; use Bref\Event\Sqs\SqsEvent; use Bref\Event\Sqs\SqsHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler extends SqsHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handleSqs(SqsEvent $event, Context $context): void { $this->logger->info("Processing SQS records"); $records = $event->getRecords(); foreach ($records as $record) { try { // Assuming the SQS message is in JSON format $message = json_decode($record->getBody(), true); $this->logger->info(json_encode($message)); // TODO: Implement your custom processing logic here } catch (Exception $e) { $this->logger->error($e->getMessage()); // failed processing the record $this->markAsFailed($record); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords SQS records"); } } $logger = new StderrLogger(); return new Handler($logger);
    Python
    SDK for Python (Boto3)
    注意

    GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

    使用 Python 搭配 Lambda 報告 SQS 批次項目失敗。

    # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event, context): if event: batch_item_failures = [] sqs_batch_response = {} for record in event["Records"]: try: # process message except Exception as e: batch_item_failures.append({"itemIdentifier": record['messageId']}) sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response
    Ruby
    SDK for Ruby
    注意

    GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

    使用 Ruby 搭配 Lambda 報告 SQS 批次項目失敗。

    # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 require 'json' def lambda_handler(event:, context:) if event batch_item_failures = [] sqs_batch_response = {} event["Records"].each do |record| begin # process message rescue StandardError => e batch_item_failures << {"itemIdentifier" => record['messageId']} end end sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response end end
    Rust
    SDK for Rust
    注意

    GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

    使用 Rust 搭配 Lambda 報告 SQS 批次項目失敗。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::sqs::{SqsBatchResponse, SqsEvent}, sqs::{BatchItemFailure, SqsMessage}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn process_record(_: &SqsMessage) -> Result<(), Error> { Err(Error::from("Error processing message")) } async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<SqsBatchResponse, Error> { let mut batch_item_failures = Vec::new(); for record in event.payload.records { match process_record(&record).await { Ok(_) => (), Err(_) => batch_item_failures.push(BatchItemFailure { item_identifier: record.message_id.unwrap(), }), } } Ok(SqsBatchResponse { batch_item_failures, }) } #[tokio::main] async fn main() -> Result<(), Error> { run(service_fn(function_handler)).await }

如果失敗的事件未傳回佇列,請參閱 AWS 知識中心的如何對 Lambda 函數 SQS ReportBatchItemFailures? 進行故障診斷

成功與失敗條件

如果您的函數傳回下列任一項目,Lambda 會將批次視為完全成功:

  • 空白 batchItemFailures 清單

  • Null batchItemFailures 清單

  • 空白 EventResponse

  • Null EventResponse

如果您的函數傳回下列任一項目,Lambda 會將批次視為完全失敗:

  • 無效的 JSON 回應

  • 空白字串 itemIdentifier

  • Null itemIdentifier

  • 具有錯誤金鑰名稱的 itemIdentifier

  • 具有不存在之訊息 ID 的 itemIdentifier

CloudWatch 指標

若要判斷您的函數是否正確報告批次項目失敗,您可以在 Amazon CloudWatch 中監控 NumberOfMessagesDeletedApproximateAgeOfOldestMessage Amazon SQS 指標。

  • NumberOfMessagesDeleted 會追蹤從佇列移除的訊息數目。如果下降到 0,表示您的函數回應並未正確傳回失敗訊息。

  • ApproximateAgeOfOldestMessage 會追蹤最舊訊息停留在佇列中的時間長度。此指標的急劇增加可能表示您的函數並未正確傳回失敗訊息。