Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Configuration d'une réponse par lots partielle avec DynamoDB et Lambda
Lors de l’utilisation et du traitement de données de streaming à partir d’une source d’événement, par défaut, Lambda n’effectue un point de contrôle sur le numéro de séquence le plus élevé d’un lot que lorsque celui-ci est un succès complet. Lambda traite tous les autres résultats comme un échec complet et recommence à traiter le lot jusqu’à atteindre la limite de nouvelles tentatives. Pour autoriser des succès partiels lors du traitement des lots à partir d’un flux, activez ReportBatchItemFailures
. Autoriser des succès partiels peut permettre de réduire le nombre de nouvelles tentatives sur un enregistrement, mais cela n’empêche pas entièrement la possibilité de nouvelles tentatives dans un enregistrement réussi.
Pour l'activerReportBatchItemFailures
, incluez la valeur enum ReportBatchItemFailures
dans la FunctionResponseTypesliste. Cette liste indique quels types de réponse sont activés pour votre fonction. Vous pouvez configurer cette liste lorsque vous créez ou mettez à jour un mappage de source d'événements.
Syntaxe du rapport
Lors de la configuration des rapports d’échec d’articles de lot, la classe StreamsEventResponse
est renvoyée avec une liste d’échecs d’articles de lot. Vous pouvez utiliser un objet StreamsEventResponse
pour renvoyer le numéro de séquence du premier enregistrement ayant échoué dans le lot. Vous pouvez également créer votre classe personnalisée en utilisant la syntaxe de réponse correcte. La JSON structure suivante indique la syntaxe de réponse requise :
{
"batchItemFailures": [
{
"itemIdentifier": "<SequenceNumber>"
}
]
}
Si le tableau batchItemFailures
contient plusieurs éléments, Lambda utilise l’enregistrement portant le numéro de séquence le plus bas comme point de contrôle. Lambda réessaie ensuite tous les enregistrements à partir de ce point de contrôle.
Conditions de réussite et d’échec
Lambda traite un lot comme un succès complet si vous renvoyez l’un des éléments suivants :
Lambda traite un lot comme un échec complet si vous renvoyez l’un des éléments suivants :
Lambda effectue des nouvelles tentatives en cas d’échec en fonction de votre stratégie de nouvelle tentative.
Diviser un lot
Si votre invocation échoue et que BisectBatchOnFunctionError
est activé, le lot est divisé en deux quel que soit votre paramètre ReportBatchItemFailures
.
Quand une réponse de succès partiel de lot est reçue et que les paramètres BisectBatchOnFunctionError
et ReportBatchItemFailures
sont activés, le lot est divisé au numéro de séquence renvoyé, et Lambda n’effectue de nouvelle tentative que sur les enregistrements restants.
Voici quelques exemples de code de fonction qui renvoie la liste des messages ayant échoué IDs dans le lot :
- .NET
-
- AWS SDK for .NET
-
Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.
Signaler les défaillances d'éléments de lot DynamoDB avec Lambda à l'aide de. NET.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace AWSLambda_DDB;
public class Function
{
public StreamsEventResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
{
context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new List<StreamsEventResponse.BatchItemFailure>();
StreamsEventResponse streamsEventResponse = new StreamsEventResponse();
foreach (var record in dynamoEvent.Records)
{
try
{
var sequenceNumber = record.Dynamodb.SequenceNumber;
context.Logger.LogInformation(sequenceNumber);
}
catch (Exception ex)
{
context.Logger.LogError(ex.Message);
batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = record.Dynamodb.SequenceNumber });
}
}
if (batchItemFailures.Count > 0)
{
streamsEventResponse.BatchItemFailures = batchItemFailures;
}
context.Logger.LogInformation("Stream processing complete.");
return streamsEventResponse;
}
}
- Go
-
- SDKpour Go V2
-
Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.
Signaler les défaillances d'éléments de lot DynamoDB avec Lambda à l'aide de Go.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
)
type BatchItemFailure struct {
ItemIdentifier string `json:"ItemIdentifier"`
}
type BatchResult struct {
BatchItemFailures []BatchItemFailure `json:"BatchItemFailures"`
}
func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*BatchResult, error) {
var batchItemFailures []BatchItemFailure
curRecordSequenceNumber := ""
for _, record := range event.Records {
// Process your record
curRecordSequenceNumber = record.Change.SequenceNumber
}
if curRecordSequenceNumber != "" {
batchItemFailures = append(batchItemFailures, BatchItemFailure{ItemIdentifier: curRecordSequenceNumber})
}
batchResult := BatchResult{
BatchItemFailures: batchItemFailures,
}
return &batchResult, nil
}
func main() {
lambda.Start(HandleRequest)
}
- Java
-
- SDKpour Java 2.x
-
Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.
Signaler les défaillances d'éléments de lot DynamoDB avec Lambda à l'aide de Java.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, Serializable> {
@Override
public StreamsEventResponse handleRequest(DynamodbEvent input, Context context) {
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
String curRecordSequenceNumber = "";
for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) {
try {
//Process your record
StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb();
curRecordSequenceNumber = dynamodbRecord.getSequenceNumber();
} catch (Exception e) {
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
return new StreamsEventResponse(batchItemFailures);
}
}
return new StreamsEventResponse();
}
}
- JavaScript
-
- SDKpour JavaScript (v3)
-
Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.
Signaler les défaillances d'éléments de lot DynamoDB avec Lambda à l'aide de. JavaScript
export const handler = async (event) => {
const records = event.Records;
let curRecordSequenceNumber = "";
for (const record of records) {
try {
// Process your record
curRecordSequenceNumber = record.dynamodb.SequenceNumber;
} catch (e) {
// Return failed record's sequence number
return { batchItemFailures: [{ itemIdentifier: curRecordSequenceNumber }] };
}
}
return { batchItemFailures: [] };
};
Signaler les défaillances d'éléments de lot DynamoDB avec Lambda à l'aide de. TypeScript
import {
DynamoDBBatchResponse,
DynamoDBBatchItemFailure,
DynamoDBStreamEvent,
} from "aws-lambda";
export const handler = async (
event: DynamoDBStreamEvent
): Promise<DynamoDBBatchResponse> => {
const batchItemFailures: DynamoDBBatchItemFailure[] = [];
let curRecordSequenceNumber;
for (const record of event.Records) {
curRecordSequenceNumber = record.dynamodb?.SequenceNumber;
if (curRecordSequenceNumber) {
batchItemFailures.push({
itemIdentifier: curRecordSequenceNumber,
});
}
}
return { batchItemFailures: batchItemFailures };
};
- PHP
-
- SDK pour PHP
-
Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.
Signaler les défaillances d'éléments de lot DynamoDB avec Lambda à l'aide de. PHP
<?php
# using bref/bref and bref/logger for simplicity
use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;
require __DIR__ . '/vendor/autoload.php';
class Handler implements StdHandler
{
private StderrLogger $logger;
public function __construct(StderrLogger $logger)
{
$this->logger = $logger;
}
/**
* @throws JsonException
* @throws \Bref\Event\InvalidLambdaEvent
*/
public function handle(mixed $event, Context $context): array
{
$dynamoDbEvent = new DynamoDbEvent($event);
$this->logger->info("Processing records");
$records = $dynamoDbEvent->getRecords();
$failedRecords = [];
foreach ($records as $record) {
try {
$data = $record->getData();
$this->logger->info(json_encode($data));
// TODO: Do interesting work based on the new data
} catch (Exception $e) {
$this->logger->error($e->getMessage());
// failed processing the record
$failedRecords[] = $record->getSequenceNumber();
}
}
$totalRecords = count($records);
$this->logger->info("Successfully processed $totalRecords records");
// change format for the response
$failures = array_map(
fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber],
$failedRecords
);
return [
'batchItemFailures' => $failures
];
}
}
$logger = new StderrLogger();
return new Handler($logger);
- Python
-
- SDKpour Python (Boto3)
-
Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.
Signaler des défaillances d'éléments de lot DynamoDB avec Lambda à l'aide de Python.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def handler(event, context):
records = event.get("Records")
curRecordSequenceNumber = ""
for record in records:
try:
# Process your record
curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"]
except Exception as e:
# Return failed record's sequence number
return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]}
return {"batchItemFailures":[]}
- Ruby
-
- SDKpour Ruby
-
Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.
Signaler les défaillances d'éléments de lot DynamoDB avec Lambda à l'aide de Ruby.
def lambda_handler(event:, context:)
records = event["Records"]
cur_record_sequence_number = ""
records.each do |record|
begin
# Process your record
cur_record_sequence_number = record["dynamodb"]["SequenceNumber"]
rescue StandardError => e
# Return failed record's sequence number
return {"batchItemFailures" => [{"itemIdentifier" => cur_record_sequence_number}]}
end
end
{"batchItemFailures" => []}
end
- Rust
-
- SDKpour Rust
-
Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.
Signaler les défaillances d'éléments de lot DynamoDB avec Lambda à l'aide de Rust.
use aws_lambda_events::{
event::dynamodb::{Event, EventRecord, StreamRecord},
streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
/// Process the stream record
fn process_record(record: &EventRecord) -> Result<(), Error> {
let stream_record: &StreamRecord = &record.change;
// process your stream record here...
tracing::info!("Data: {:?}", stream_record);
Ok(())
}
/// Main Lambda handler here...
async fn function_handler(event: LambdaEvent<Event>) -> Result<DynamoDbEventResponse, Error> {
let mut response = DynamoDbEventResponse {
batch_item_failures: vec![],
};
let records = &event.payload.records;
if records.is_empty() {
tracing::info!("No records found. Exiting.");
return Ok(response);
}
for record in records {
tracing::info!("EventId: {}", record.event_id);
// Couldn't find a sequence number
if record.change.sequence_number.is_none() {
response.batch_item_failures.push(DynamoDbBatchItemFailure {
item_identifier: Some("".to_string()),
});
return Ok(response);
}
// Process your record here...
if process_record(record).is_err() {
response.batch_item_failures.push(DynamoDbBatchItemFailure {
item_identifier: record.change.sequence_number.clone(),
});
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
return Ok(response);
}
}
tracing::info!("Successfully processed {} record(s)", records.len());
Ok(response)
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
run(service_fn(function_handler)).await
}