本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
在 Lambda 中處理 SQS 事件來源的錯誤
為了處理與 SQS 事件來源相關的錯誤,Lambda 會自動使用重試策略搭配退避策略。您也可以透過設定 SQS 事件來源映射,來自訂錯誤處理行為,以傳回部分批次回應。
失敗調用的輪詢策略
當調用失敗,Lambda 會在實作輪詢策略時嘗試重試調用。根據 Lambda 是否因函數程式碼錯誤或限流而發生失敗,輪詢策略會略有不同。
-
如果是函數程式碼導致錯誤,則 Lambda 會停止處理並重試調用。同時,Lambda 會逐漸退避,減少分配給 Amazon SQS 事件來源映射的並行數量。在佇列的可見性逾時時間到了之後,訊息會再次重新出現在佇列中。
-
如果是限流導致調用失敗,Lambda 會減少分配給 Amazon SQS 事件來源映射的並行數量,逐漸重試輪詢。Lambda 會持續重試訊息,直到訊息的時間戳記超過佇列的可見性逾時為止,此時 Lambda 會捨棄訊息。
實作部分批次回應
當 Lambda 函數在處理批次時遇到錯誤,根據預設,該批次中的所有訊息會再次顯示在佇列中,包含 Lambda 已順利處理的訊息。因此,您的函數可能最後會處理數次相同的訊息。
若要避免重新處理失敗批次中成功處理過的訊息,您可以設定事件來源映射,僅讓失敗的訊息再次可見。我們將其稱為部分批次回應。若要開啟部分批次回應,請在設定事件來源映射時為 FunctionResponseTypes 動作指定 ReportBatchItemFailures
。這可以讓您的函數傳回部分成功,有助於減少記錄上不必要的重試次數。
啟動 ReportBatchItemFailures
時,Lambda 不會在函數調用失敗時縮減訊息輪詢的規模。如果您預期部分訊息會失敗,且不希望這些失敗影響到訊息的處理速度,則請使用 ReportBatchItemFailures
。
若要啟動部分批次報告
-
檢閱實作部分批次回應的最佳實務。
-
執行以下命令以便為函數啟用 ReportBatchItemFailures
。若要擷取事件來源映射的 UUID,請執行 list-event-source-mappings AWS CLI 命令。
aws lambda update-event-source-mapping \
--uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE"
\
--function-response-types "ReportBatchItemFailures"
-
更新函數程式碼以擷取所有例外狀況,並傳回 batchItemFailures
JSON 回應中的失敗訊息。batchItemFailures
回應必須含有以 itemIdentifier
JSON 值表示的訊息 ID 清單。
例如,假設您有五則訊息的批次,其中,訊息 ID 分別為 id1
、id2
、id3
、id4
,以及 id5
。您的函數已成功處理 id1
、id3
,以及 id5
。若要讓訊息 id2
和 id4
再次於佇列中可見,您的函數應傳回以下回應:
{
"batchItemFailures": [
{
"itemIdentifier": "id2"
},
{
"itemIdentifier": "id4"
}
]
}
以下範例函數程式碼會傳回批次中失敗訊息 ID 的清單:
- .NET
-
- AWS SDK for .NET
-
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 .NET 搭配 Lambda 報告 SQS 批次項目失敗。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using Amazon.Lambda.Core;
using Amazon.Lambda.SQSEvents;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace sqsSample;
public class Function
{
public async Task<SQSBatchResponse> FunctionHandler(SQSEvent evnt, ILambdaContext context)
{
List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new List<SQSBatchResponse.BatchItemFailure>();
foreach(var message in evnt.Records)
{
try
{
//process your message
await ProcessMessageAsync(message, context);
}
catch (System.Exception)
{
//Add failed message identifier to the batchItemFailures list
batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure{ItemIdentifier=message.MessageId});
}
}
return new SQSBatchResponse(batchItemFailures);
}
private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context)
{
if (String.IsNullOrEmpty(message.Body))
{
throw new Exception("No Body in SQS Message.");
}
context.Logger.LogInformation($"Processed message {message.Body}");
// TODO: Do interesting work based on the new message
await Task.CompletedTask;
}
}
- Go
-
- SDK for Go V2
-
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 Go 搭配 Lambda 報告 SQS 批次項目失敗。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
)
func handler(ctx context.Context, sqsEvent events.SQSEvent) (map[string]interface{}, error) {
batchItemFailures := []map[string]interface{}{}
for _, message := range sqsEvent.Records {
if /* Your message processing condition here */ {
batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": message.MessageId})
}
}
sqsBatchResponse := map[string]interface{}{
"batchItemFailures": batchItemFailures,
}
return sqsBatchResponse, nil
}
func main() {
lambda.Start(handler)
}
- Java
-
- SDK for Java 2.x
-
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 Java 搭配 Lambda 報告 SQS 批次項目失敗。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
import java.util.ArrayList;
import java.util.List;
public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> {
@Override
public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>();
String messageId = "";
for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) {
try {
//process your message
messageId = message.getMessageId();
} catch (Exception e) {
//Add failed message identifier to the batchItemFailures list
batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(messageId));
}
}
return new SQSBatchResponse(batchItemFailures);
}
}
- JavaScript
-
- SDK for JavaScript (v3)
-
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 JavaScript 搭配 Lambda 報告 SQS 批次項目失敗。
// Node.js 20.x Lambda runtime, AWS SDK for Javascript V3
export const handler = async (event, context) => {
const batchItemFailures = [];
for (const record of event.Records) {
try {
await processMessageAsync(record, context);
} catch (error) {
batchItemFailures.push({ itemIdentifier: record.messageId });
}
}
return { batchItemFailures };
};
async function processMessageAsync(record, context) {
if (record.body && record.body.includes("error")) {
throw new Error("There is an error in the SQS Message.");
}
console.log(`Processed message: ${record.body}`);
}
使用 TypeScript 搭配 Lambda 報告 SQS 批次項目失敗。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import { SQSEvent, SQSBatchResponse, Context, SQSBatchItemFailure, SQSRecord } from 'aws-lambda';
export const handler = async (event: SQSEvent, context: Context): Promise<SQSBatchResponse> => {
const batchItemFailures: SQSBatchItemFailure[] = [];
for (const record of event.Records) {
try {
await processMessageAsync(record);
} catch (error) {
batchItemFailures.push({ itemIdentifier: record.messageId });
}
}
return {batchItemFailures: batchItemFailures};
};
async function processMessageAsync(record: SQSRecord): Promise<void> {
if (record.body && record.body.includes("error")) {
throw new Error('There is an error in the SQS Message.');
}
console.log(`Processed message ${record.body}`);
}
- PHP
-
- SDK for PHP
-
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 PHP 搭配 Lambda 報告 SQS 批次項目失敗。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php
use Bref\Context\Context;
use Bref\Event\Sqs\SqsEvent;
use Bref\Event\Sqs\SqsHandler;
use Bref\Logger\StderrLogger;
require __DIR__ . '/vendor/autoload.php';
class Handler extends SqsHandler
{
private StderrLogger $logger;
public function __construct(StderrLogger $logger)
{
$this->logger = $logger;
}
/**
* @throws JsonException
* @throws \Bref\Event\InvalidLambdaEvent
*/
public function handleSqs(SqsEvent $event, Context $context): void
{
$this->logger->info("Processing SQS records");
$records = $event->getRecords();
foreach ($records as $record) {
try {
// Assuming the SQS message is in JSON format
$message = json_decode($record->getBody(), true);
$this->logger->info(json_encode($message));
// TODO: Implement your custom processing logic here
} catch (Exception $e) {
$this->logger->error($e->getMessage());
// failed processing the record
$this->markAsFailed($record);
}
}
$totalRecords = count($records);
$this->logger->info("Successfully processed $totalRecords SQS records");
}
}
$logger = new StderrLogger();
return new Handler($logger);
- Python
-
- SDK for Python (Boto3)
-
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 Python 搭配 Lambda 報告 SQS 批次項目失敗。
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def lambda_handler(event, context):
if event:
batch_item_failures = []
sqs_batch_response = {}
for record in event["Records"]:
try:
# process message
except Exception as e:
batch_item_failures.append({"itemIdentifier": record['messageId']})
sqs_batch_response["batchItemFailures"] = batch_item_failures
return sqs_batch_response
- Ruby
-
- SDK for Ruby
-
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 Ruby 搭配 Lambda 報告 SQS 批次項目失敗。
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'json'
def lambda_handler(event:, context:)
if event
batch_item_failures = []
sqs_batch_response = {}
event["Records"].each do |record|
begin
# process message
rescue StandardError => e
batch_item_failures << {"itemIdentifier" => record['messageId']}
end
end
sqs_batch_response["batchItemFailures"] = batch_item_failures
return sqs_batch_response
end
end
- Rust
-
- SDK for Rust
-
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 Rust 搭配 Lambda 報告 SQS 批次項目失敗。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::{
event::sqs::{SqsBatchResponse, SqsEvent},
sqs::{BatchItemFailure, SqsMessage},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
async fn process_record(_: &SqsMessage) -> Result<(), Error> {
Err(Error::from("Error processing message"))
}
async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<SqsBatchResponse, Error> {
let mut batch_item_failures = Vec::new();
for record in event.payload.records {
match process_record(&record).await {
Ok(_) => (),
Err(_) => batch_item_failures.push(BatchItemFailure {
item_identifier: record.message_id.unwrap(),
}),
}
}
Ok(SqsBatchResponse {
batch_item_failures,
})
}
#[tokio::main]
async fn main() -> Result<(), Error> {
run(service_fn(function_handler)).await
}
如果失敗的事件未傳回佇列,請參閱 AWS 知識中心的如何對 Lambda 函數 SQS ReportBatchItemFailures? 進行故障診斷。
成功與失敗條件
如果您的函數傳回下列任一項目,Lambda 會將批次視為完全成功:
如果您的函數傳回下列任一項目,Lambda 會將批次視為完全失敗:
CloudWatch 指標
若要判斷您的函數是否正確報告批次項目失敗,您可以在 Amazon CloudWatch 中監控 NumberOfMessagesDeleted
和 ApproximateAgeOfOldestMessage
Amazon SQS 指標。