Configuración de la respuesta por lotes parcial con DynamoDB y Lambda - AWS Lambda

Configuración de la respuesta por lotes parcial con DynamoDB y Lambda

Al consumir y procesar datos de streaming desde un origen de eventos, de forma predeterminada, Lambda comprueba hasta el número de secuencia más alto de un lote solo cuando el lote se ha completado con éxito. Lambda trata todos los demás resultados como un error completo y vuelve a intentar procesar el lote hasta el límite de reintentos. Para permitir éxitos parciales al procesar lotes de una secuencia, active ReportBatchItemFailures. Permitir éxitos parciales puede ayudar a reducir el número de reintentos en un registro, aunque no impide por completo la posibilidad de reintentos en un registro exitoso.

Para activar ReportBatchItemFailures, incluya el valor enumerado ReportBatchItemFailures en la lista FunctionResponseTypes. Esta lista indica qué tipos de respuesta están habilitados para su función. Puede configurar esta lista al crear o actualizar una asignación de orígenes de eventos.

Sintaxis del informe

Al configurar los informes sobre errores de elementos por lotes, la clase StreamsEventResponse se devuelve con una lista de errores de elementos de lote. Puede utilizar un objeto StreamsEventResponse para devolver el número de secuencia del primer registro fallido del lote. También puede crear su propia clase personalizada usando la sintaxis de respuesta correcta. La siguiente estructura JSON muestra la sintaxis de respuesta requerida:

{ "batchItemFailures": [ { "itemIdentifier": "<SequenceNumber>" } ] }
nota

Si la matriz batchItemFailures contiene varios elementos, Lambda usa el registro con el número de secuencia más bajo como punto de control. Luego Lambda vuelve a probar todos los registros a partir de ese punto de control.

Condiciones de éxito y fracaso

Lambda trata un lote como un éxito completo si devuelve cualquiera de los siguientes elementos:

  • Una lista batchItemFailure vacía

  • Una lista batchItemFailure nula

  • Una EventResponse vacía

  • Un EventResponse nulo

Lambda trata un lote como un error completo si devuelve cualquiera de los siguientes elementos:

  • Una cadena itemIdentifier vacía

  • Una itemIdentifier nula

  • Un itemIdentifier con un mal nombre de clave

Lambda reintentos fallidos basados en su estrategia de reintento.

Bisecar un lote

Si su invocación falla y BisectBatchOnFunctionError está activada, el lote se divide en bisectos independientemente de su configuración ReportBatchItemFailures.

Cuando se recibe una respuesta de éxito parcial de lote y se activan tanto BisectBatchOnFunctionError como ReportBatchItemFailures, el lote se divide en el número de secuencia devuelto y Lambda vuelve a intentar solo los registros restantes.

Estos son algunos ejemplos de código de función que devuelven la lista de IDs de mensajes fallidos del lote:

.NET
AWS SDK for .NET
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Notificación de los errores de los elementos del lote de DynamoDB con Lambda mediante .NET.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text.Json; using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.DynamoDBEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace AWSLambda_DDB; public class Function { public StreamsEventResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context) { context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records..."); List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new List<StreamsEventResponse.BatchItemFailure>(); StreamsEventResponse streamsEventResponse = new StreamsEventResponse(); foreach (var record in dynamoEvent.Records) { try { var sequenceNumber = record.Dynamodb.SequenceNumber; context.Logger.LogInformation(sequenceNumber); } catch (Exception ex) { context.Logger.LogError(ex.Message); batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = record.Dynamodb.SequenceNumber }); } } if (batchItemFailures.Count > 0) { streamsEventResponse.BatchItemFailures = batchItemFailures; } context.Logger.LogInformation("Stream processing complete."); return streamsEventResponse; } }
Go
SDK para Go V2
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Notificación de los errores de los elementos del lote de DynamoDB con Lambda mediante Go.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) type BatchItemFailure struct { ItemIdentifier string `json:"ItemIdentifier"` } type BatchResult struct { BatchItemFailures []BatchItemFailure `json:"BatchItemFailures"` } func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*BatchResult, error) { var batchItemFailures []BatchItemFailure curRecordSequenceNumber := "" for _, record := range event.Records { // Process your record curRecordSequenceNumber = record.Change.SequenceNumber } if curRecordSequenceNumber != "" { batchItemFailures = append(batchItemFailures, BatchItemFailure{ItemIdentifier: curRecordSequenceNumber}) } batchResult := BatchResult{ BatchItemFailures: batchItemFailures, } return &batchResult, nil } func main() { lambda.Start(HandleRequest) }
Java
SDK para Java 2.x
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Notificación de los errores de los elementos del lote de DynamoDB con Lambda mediante Java.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, Serializable> { @Override public StreamsEventResponse handleRequest(DynamodbEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>(); String curRecordSequenceNumber = ""; for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) { try { //Process your record StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb(); curRecordSequenceNumber = dynamodbRecord.getSequenceNumber(); } catch (Exception e) { /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(); } }
JavaScript
SDK para JavaScript (v3)
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Notificación de los errores de los elementos del lote de DynamoDB con Lambda mediante JavaScript.

export const handler = async (event) => { const records = event.Records; let curRecordSequenceNumber = ""; for (const record of records) { try { // Process your record curRecordSequenceNumber = record.dynamodb.SequenceNumber; } catch (e) { // Return failed record's sequence number return { batchItemFailures: [{ itemIdentifier: curRecordSequenceNumber }] }; } } return { batchItemFailures: [] }; };

Notificación de los errores de los elementos del lote de DynamoDB con Lambda mediante TypeScript.

import { DynamoDBBatchResponse, DynamoDBBatchItemFailure, DynamoDBStreamEvent, } from "aws-lambda"; export const handler = async ( event: DynamoDBStreamEvent ): Promise<DynamoDBBatchResponse> => { const batchItemFailures: DynamoDBBatchItemFailure[] = []; let curRecordSequenceNumber; for (const record of event.Records) { curRecordSequenceNumber = record.dynamodb?.SequenceNumber; if (curRecordSequenceNumber) { batchItemFailures.push({ itemIdentifier: curRecordSequenceNumber, }); } } return { batchItemFailures: batchItemFailures }; };
PHP
SDK para PHP
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Notificación de los errores de los elementos del lote de DynamoDB con Lambda mediante PHP.

<?php # using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\DynamoDb\DynamoDbEvent; use Bref\Event\Handler as StdHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler implements StdHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handle(mixed $event, Context $context): array { $dynamoDbEvent = new DynamoDbEvent($event); $this->logger->info("Processing records"); $records = $dynamoDbEvent->getRecords(); $failedRecords = []; foreach ($records as $record) { try { $data = $record->getData(); $this->logger->info(json_encode($data)); // TODO: Do interesting work based on the new data } catch (Exception $e) { $this->logger->error($e->getMessage()); // failed processing the record $failedRecords[] = $record->getSequenceNumber(); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); // change format for the response $failures = array_map( fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber], $failedRecords ); return [ 'batchItemFailures' => $failures ]; } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK para Python (Boto3)
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Notificación de los errores de los elementos del lote de DynamoDB con Lambda mediante Python.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def handler(event, context): records = event.get("Records") curRecordSequenceNumber = "" for record in records: try: # Process your record curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}
Ruby
SDK para Ruby
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Notificación de los errores de los elementos del lote de DynamoDB con Lambda mediante Ruby.

def lambda_handler(event:, context:) records = event["Records"] cur_record_sequence_number = "" records.each do |record| begin # Process your record cur_record_sequence_number = record["dynamodb"]["SequenceNumber"] rescue StandardError => e # Return failed record's sequence number return {"batchItemFailures" => [{"itemIdentifier" => cur_record_sequence_number}]} end end {"batchItemFailures" => []} end
Rust
SDK para Rust
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Notificación de los errores de los elementos del lote de DynamoDB con Lambda mediante Rust.

use aws_lambda_events::{ event::dynamodb::{Event, EventRecord, StreamRecord}, streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; /// Process the stream record fn process_record(record: &EventRecord) -> Result<(), Error> { let stream_record: &StreamRecord = &record.change; // process your stream record here... tracing::info!("Data: {:?}", stream_record); Ok(()) } /// Main Lambda handler here... async fn function_handler(event: LambdaEvent<Event>) -> Result<DynamoDbEventResponse, Error> { let mut response = DynamoDbEventResponse { batch_item_failures: vec![], }; let records = &event.payload.records; if records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(response); } for record in records { tracing::info!("EventId: {}", record.event_id); // Couldn't find a sequence number if record.change.sequence_number.is_none() { response.batch_item_failures.push(DynamoDbBatchItemFailure { item_identifier: Some("".to_string()), }); return Ok(response); } // Process your record here... if process_record(record).is_err() { response.batch_item_failures.push(DynamoDbBatchItemFailure { item_identifier: record.change.sequence_number.clone(), }); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return Ok(response); } } tracing::info!("Successfully processed {} record(s)", records.len()); Ok(response) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }