Gestión de errores para un origen de eventos de SQS en Lambda
Para gestionar los errores relacionados con un origen de eventos de SQS, Lambda utiliza automáticamente una estrategia de reintento con una estrategia de retroceso. También puede personalizar el comportamiento de la gestión de errores si configura la asignación de orígenes de eventos de SQS para que devuelva respuestas parciales por lotes.
Estrategia de retroceso para invocaciones fallidas
Cuando se produce un error en una invocación, Lambda intenta volver a intentar la invocación mientras implementa una estrategia de retraso. La estrategia de retroceso difiere ligeramente en función de si Lambda encontró el error debido a un error en el código de la función o a una limitación.
-
Si el código de la función provocó el error, Lambda dejará de procesar y volver a intentar la invocación. Mientras tanto, Lambda reduce de manera gradual el volumen de simultaneidad asignada a su asignación de orígenes de eventos de Amazon SQS. Cuando se agote el tiempo de espera de visibilidad de la cola, el mensaje volverá a aparecer en la cola.
-
Si la invocación falla debido a la limitación, Lambda retrocede gradualmente los reintentos reduciendo la cantidad de simultaneidad asignada a la asignación de orígenes de eventos de Amazon SQS. Lambda sigue reintentando enviar el mensaje hasta que la marca de tiempo del mensaje supere el tiempo límite de visibilidad de la cola, momento en el que Lambda descarta el mensaje.
Implementación de respuestas parciales por lotes
Cuando la función Lambda detecta un error al procesar un lote, todos los mensajes de ese lote se vuelven a ver en la cola de forma predeterminada, incluidos los mensajes que Lambda procesó correctamente. Como resultado, la función puede terminar procesando el mismo mensaje varias veces.
Para evitar el reprocesamiento de todos los mensajes procesados con éxito en un lote con errores, puede configurar la asignación de orígenes de eventos para que solo se vean de nuevo los mensajes fallidos. Esto se denomina respuesta parcial por lotes. Para activar las respuestas parciales por lotes, especifique ReportBatchItemFailures
en la acción FunctionResponseTypes cuando configure la asignación de orígenes de eventos. Esto permite que la función devuelva un éxito parcial, lo que puede ayudar a reducir el número de reintentos innecesarios en los registros.
Cuando ReportBatchItemFailures
está activado, Lambda no reduce verticalmente el sondeo de mensajes cuando fallan las invocaciones de las funciones. Utilice ReportBatchItemFailures
, si espera que algunos mensajes fallen y no quiere que esos errores afecten a la velocidad de procesamiento de los mensajes.
Tenga en cuenta las siguientes consideraciones al utilizar las respuestas parciales por lotes:
-
Si la función genera una excepción, todo el lote se considera un error completo.
-
Si utiliza esta característica con una cola FIFO, la función debe dejar de procesar los mensajes después del primer error y devolver todos los mensajes fallidos y sin procesar en batchItemFailures
. Esto ayuda a preservar el orden de los mensajes en la cola.
Para activar los informes parciales por lotes
-
Revise las Prácticas recomendadas para implementar las respuestas parciales por lotes.
-
Ejecute el siguiente comando para activar ReportBatchItemFailures
para la función. Para recuperar el UUID de la asignación de orígenes de eventos, ejecute el comando list-event-source-mappings de la AWS CLI.
aws lambda update-event-source-mapping \
--uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE"
\
--function-response-types "ReportBatchItemFailures"
-
Actualice el código de la función para detectar todas las excepciones y devolver los mensajes fallidos en una respuesta JSON batchItemFailures
. La respuesta batchItemFailures
debe incluir una lista de ID de mensajes, como valores JSON itemIdentifier
.
Supongamos que tiene un lote de cinco mensajes con ID de mensaje id1
, id2
, id3
, id4
y id5
. Su función procesa correctamente id1
, id3
y id5
. Para hacer que los mensajes id2
y id4
sean de nuevo visibles en la cola, la función debe devolver la siguiente respuesta:
{
"batchItemFailures": [
{
"itemIdentifier": "id2"
},
{
"itemIdentifier": "id4"
}
]
}
Estos son algunos ejemplos de código de función que devuelven la lista de IDs de mensajes fallidos del lote:
- .NET
-
- AWS SDK for .NET
-
Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.
Notificación de los errores de los elementos del lote de SQS con Lambda mediante .NET.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using Amazon.Lambda.Core;
using Amazon.Lambda.SQSEvents;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace sqsSample;
public class Function
{
public async Task<SQSBatchResponse> FunctionHandler(SQSEvent evnt, ILambdaContext context)
{
List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new List<SQSBatchResponse.BatchItemFailure>();
foreach(var message in evnt.Records)
{
try
{
//process your message
await ProcessMessageAsync(message, context);
}
catch (System.Exception)
{
//Add failed message identifier to the batchItemFailures list
batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure{ItemIdentifier=message.MessageId});
}
}
return new SQSBatchResponse(batchItemFailures);
}
private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context)
{
if (String.IsNullOrEmpty(message.Body))
{
throw new Exception("No Body in SQS Message.");
}
context.Logger.LogInformation($"Processed message {message.Body}");
// TODO: Do interesting work based on the new message
await Task.CompletedTask;
}
}
- Go
-
- SDK para Go V2
-
Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.
Notificación de los errores de los elementos del lote de SQS con Lambda mediante Go.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
)
func handler(ctx context.Context, sqsEvent events.SQSEvent) (map[string]interface{}, error) {
batchItemFailures := []map[string]interface{}{}
for _, message := range sqsEvent.Records {
if /* Your message processing condition here */ {
batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": message.MessageId})
}
}
sqsBatchResponse := map[string]interface{}{
"batchItemFailures": batchItemFailures,
}
return sqsBatchResponse, nil
}
func main() {
lambda.Start(handler)
}
- Java
-
- SDK para Java 2.x
-
Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.
Notificación de los errores de los elementos del lote de SQS con Lambda mediante Java.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
import java.util.ArrayList;
import java.util.List;
public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> {
@Override
public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>();
String messageId = "";
for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) {
try {
//process your message
messageId = message.getMessageId();
} catch (Exception e) {
//Add failed message identifier to the batchItemFailures list
batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(messageId));
}
}
return new SQSBatchResponse(batchItemFailures);
}
}
- JavaScript
-
- SDK para JavaScript (v3)
-
Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.
Notificación de los errores de los elementos del lote de SQS con Lambda mediante JavaScript.
// Node.js 20.x Lambda runtime, AWS SDK for Javascript V3
export const handler = async (event, context) => {
const batchItemFailures = [];
for (const record of event.Records) {
try {
await processMessageAsync(record, context);
} catch (error) {
batchItemFailures.push({ itemIdentifier: record.messageId });
}
}
return { batchItemFailures };
};
async function processMessageAsync(record, context) {
if (record.body && record.body.includes("error")) {
throw new Error("There is an error in the SQS Message.");
}
console.log(`Processed message: ${record.body}`);
}
Notificación de los errores de los elementos del lote de SQS con Lambda mediante TypeScript.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import { SQSEvent, SQSBatchResponse, Context, SQSBatchItemFailure, SQSRecord } from 'aws-lambda';
export const handler = async (event: SQSEvent, context: Context): Promise<SQSBatchResponse> => {
const batchItemFailures: SQSBatchItemFailure[] = [];
for (const record of event.Records) {
try {
await processMessageAsync(record);
} catch (error) {
batchItemFailures.push({ itemIdentifier: record.messageId });
}
}
return {batchItemFailures: batchItemFailures};
};
async function processMessageAsync(record: SQSRecord): Promise<void> {
if (record.body && record.body.includes("error")) {
throw new Error('There is an error in the SQS Message.');
}
console.log(`Processed message ${record.body}`);
}
- PHP
-
- SDK para PHP
-
Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.
Notificación de los errores de los elementos del lote de SQS con Lambda mediante PHP.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php
use Bref\Context\Context;
use Bref\Event\Sqs\SqsEvent;
use Bref\Event\Sqs\SqsHandler;
use Bref\Logger\StderrLogger;
require __DIR__ . '/vendor/autoload.php';
class Handler extends SqsHandler
{
private StderrLogger $logger;
public function __construct(StderrLogger $logger)
{
$this->logger = $logger;
}
/**
* @throws JsonException
* @throws \Bref\Event\InvalidLambdaEvent
*/
public function handleSqs(SqsEvent $event, Context $context): void
{
$this->logger->info("Processing SQS records");
$records = $event->getRecords();
foreach ($records as $record) {
try {
// Assuming the SQS message is in JSON format
$message = json_decode($record->getBody(), true);
$this->logger->info(json_encode($message));
// TODO: Implement your custom processing logic here
} catch (Exception $e) {
$this->logger->error($e->getMessage());
// failed processing the record
$this->markAsFailed($record);
}
}
$totalRecords = count($records);
$this->logger->info("Successfully processed $totalRecords SQS records");
}
}
$logger = new StderrLogger();
return new Handler($logger);
- Python
-
- SDK para Python (Boto3)
-
Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.
Notificación de los errores de los elementos del lote de SQS con Lambda mediante Python.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def lambda_handler(event, context):
if event:
batch_item_failures = []
sqs_batch_response = {}
for record in event["Records"]:
try:
# process message
except Exception as e:
batch_item_failures.append({"itemIdentifier": record['messageId']})
sqs_batch_response["batchItemFailures"] = batch_item_failures
return sqs_batch_response
- Ruby
-
- SDK para Ruby
-
Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.
Notificación de los errores de los elementos del lote de SQS con Lambda mediante Ruby.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'json'
def lambda_handler(event:, context:)
if event
batch_item_failures = []
sqs_batch_response = {}
event["Records"].each do |record|
begin
# process message
rescue StandardError => e
batch_item_failures << {"itemIdentifier" => record['messageId']}
end
end
sqs_batch_response["batchItemFailures"] = batch_item_failures
return sqs_batch_response
end
end
- Rust
-
- SDK para Rust
-
Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.
Notificación de los errores de los elementos del lote de SQS con Lambda mediante Rust.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::{
event::sqs::{SqsBatchResponse, SqsEvent},
sqs::{BatchItemFailure, SqsMessage},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
async fn process_record(_: &SqsMessage) -> Result<(), Error> {
Err(Error::from("Error processing message"))
}
async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<SqsBatchResponse, Error> {
let mut batch_item_failures = Vec::new();
for record in event.payload.records {
match process_record(&record).await {
Ok(_) => (),
Err(_) => batch_item_failures.push(BatchItemFailure {
item_identifier: record.message_id.unwrap(),
}),
}
}
Ok(SqsBatchResponse {
batch_item_failures,
})
}
#[tokio::main]
async fn main() -> Result<(), Error> {
run(service_fn(function_handler)).await
}
Si los eventos fallidos no vuelven a la cola, consulte ¿Cómo soluciono los problemas de la función de Lambda SQS ReportBatchItemFailures? en el Centro de conocimientos de AWS.
Condiciones de éxito y fracaso
Lambda trata un lote como un éxito completo si la función devuelve cualquiera de los siguientes elementos:
Lambda trata un lote como un error completo si la función devuelve cualquiera de los siguientes elementos:
-
Respuesta JSON no válida
-
Una cadena itemIdentifier
vacía
-
Una itemIdentifier
nula
-
Un itemIdentifier
con un mal nombre de clave
-
Un valor itemIdentifier
con un ID de mensaje que no existe
Métricas de CloudWatch
Para determinar si la función informa correctamente de los errores de elementos por lotes, puede monitorear las métricas Amazon SQS NumberOfMessagesDeleted
y ApproximateAgeOfOldestMessage
en Amazon CloudWatch.
-
NumberOfMessagesDeleted
realiza un seguimiento del número de mensajes eliminados de la cola. Si cae a 0, es una señal de que la respuesta de la función no está devolviendo correctamente los mensajes fallidos.
-
ApproximateAgeOfOldestMessage
hace un seguimiento de cuánto tiempo ha permanecido el mensaje más antiguo en la cola. Un aumento brusco de esta métrica puede indicar que la función no está devolviendo correctamente los mensajes fallidos.