When consuming and processing streaming data from an event source, by default Lambda checkpoints to the highest
sequence number of a batch only when the batch is a complete success. Lambda treats all other results as a complete
failure and retries processing the batch up to the retry limit. To allow for partial successes while processing
batches from a stream, turn on ReportBatchItemFailures
. Allowing partial successes can help to reduce
the number of retries on a record, though it doesn’t entirely prevent the possibility of retries in a successful record.
To turn on ReportBatchItemFailures
, include the enum value
ReportBatchItemFailures
in the FunctionResponseTypes list. This list indicates
which response types are enabled for your function. You can configure this list when you create or update an event source mapping.
Report syntax
When configuring reporting on batch item failures, the StreamsEventResponse
class is returned with a
list of batch item failures. You can use a StreamsEventResponse
object to return the sequence number
of the first failed record in the batch. You can also create your own custom class using the correct response
syntax. The following JSON structure shows the required response syntax:
{
"batchItemFailures": [
{
"itemIdentifier": "<SequenceNumber>"
}
]
}
Note
If the batchItemFailures
array contains multiple items, Lambda uses the record with the lowest
sequence number as the checkpoint. Lambda then retries all records starting from that checkpoint.
Success and failure conditions
Lambda treats a batch as a complete success if you return any of the following:
-
An empty
batchItemFailure
list -
A null
batchItemFailure
list -
An empty
EventResponse
-
A null
EventResponse
Lambda treats a batch as a complete failure if you return any of the following:
-
An empty string
itemIdentifier
-
A null
itemIdentifier
-
An
itemIdentifier
with a bad key name
Lambda retries failures based on your retry strategy.
Bisecting a batch
If your invocation fails and BisectBatchOnFunctionError
is turned on, the batch is bisected
regardless of your ReportBatchItemFailures
setting.
When a partial batch success response is received and both BisectBatchOnFunctionError
and
ReportBatchItemFailures
are turned on, the batch is bisected at the returned sequence number and
Lambda retries only the remaining records.
Here are some examples of function code that return the list of failed message IDs in the batch:
- AWS SDK for .NET
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the Serverless examples
repository. Reporting Kinesis batch item failures with Lambda using .NET.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text; using System.Text.Json.Serialization; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisEvents; using AWS.Lambda.Powertools.Logging; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace KinesisIntegration; public class Function { // Powertools Logger requires an environment variables against your function // POWERTOOLS_SERVICE_NAME [Logging(LogEvent = true)] public async Task<StreamsEventResponse> FunctionHandler(KinesisEvent evnt, ILambdaContext context) { if (evnt.Records.Count == 0) { Logger.LogInformation("Empty Kinesis Event received"); return new StreamsEventResponse(); } foreach (var record in evnt.Records) { try { Logger.LogInformation($"Processed Event with EventId: {record.EventId}"); string data = await GetRecordDataAsync(record.Kinesis, context); Logger.LogInformation($"Data: {data}"); // TODO: Do interesting work based on the new data } catch (Exception ex) { Logger.LogError($"An error occurred {ex.Message}"); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return new StreamsEventResponse { BatchItemFailures = new List<StreamsEventResponse.BatchItemFailure> { new StreamsEventResponse.BatchItemFailure { ItemIdentifier = record.Kinesis.SequenceNumber } } }; } } Logger.LogInformation($"Successfully processed {evnt.Records.Count} records."); return new StreamsEventResponse(); } private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context) { byte[] bytes = record.Data.ToArray(); string data = Encoding.UTF8.GetString(bytes); await Task.CompletedTask; //Placeholder for actual async work return data; } } public class StreamsEventResponse { [JsonPropertyName("batchItemFailures")] public IList<BatchItemFailure> BatchItemFailures { get; set; } public class BatchItemFailure { [JsonPropertyName("itemIdentifier")] public string ItemIdentifier { get; set; } } }