Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Configurazione della risposta batch parziale con il flusso di dati Kinesis e Lambda
Quando si consumano ed elaborano i dati di streaming da un'origine eventi, per impostazione predefinita i Lambda imposta i checkpoint al numero di sequenza più alto di un batch solo quando il batch è riuscito completamente. Lambda tratta tutti gli altri risultati come un fallimento completo e riprova a elaborare il batch fino al limite di tentativi. Per consentire i successi parziali durante l'elaborazione di batch da un flusso, attivare ReportBatchItemFailures
. Consentire successi parziali può contribuire a ridurre il numero di tentativi su un record, anche se non impedisce del tutto la possibilità di tentativi in un record riuscito.
Per attivare ReportBatchItemFailures
, includere il valore enumReportBatchItemFailures
nell'FunctionResponseTypeselenco. Questo elenco indica quali tipi di risposta sono abilitati per la funzione. È possibile configurare questo elenco quando si crea o si aggiorna uno strumento di mappatura dell'origine degli eventi.
Sintassi di report
Quando si configura la creazione di report sugli errori degli elementi batch, la StreamsEventResponse
classe viene restituita con un elenco di errori degli articoli batch. È possibile utilizzare un StreamsEventResponse
oggetto per restituire il numero di sequenza del primo record non riuscito nel batch. È inoltre possibile creare la propria classe personalizzata utilizzando la sintassi di risposta corretta. La seguente struttura JSON mostra la sintassi di risposta richiesta:
{
"batchItemFailures": [
{
"itemIdentifier": "<SequenceNumber>"
}
]
}
Se l'array batchItemFailures
contiene più elementi, Lambda utilizza il record con il numero di sequenza più basso come checkpoint. Lambda quindi riprova tutti i record a partire da quel checkpoint.
Condizioni di successo e di errore
Lambda considera un batch come un successo completo se si restituisce uno degli elementi seguenti:
-
Una batchItemFailure
lista vuota
-
Un batchItemFailure
elenco nullo
-
Un vuoto EventResponse
-
Un valore nullo EventResponse
Lambda considera un batch come un fallimento completo se si restituisce uno degli elementi seguenti:
-
Una stringa vuota itemIdentifier
-
Un valore nullo itemIdentifier
-
Un itemIdentifier
con un nome chiave errato
Lambda esegue nuovi tentativi in seguito ai fallimenti secondo la strategia di tentativi impostata.
Bisezione un batch
Se il richiamo fallisce ed BisectBatchOnFunctionError
è attivato, il batch viene bisecato a prescindere dalle ReportBatchItemFailures
impostazioni.
Quando si riceve una risposta di successo parziale del batch ed entrambi BisectBatchOnFunctionError
e ReportBatchItemFailures
sono attivati, il batch viene bisecato in corrispondenza del numero di sequenza restituito e Lambda ritenta solo i record rimanenti.
Ecco alcuni esempi di codice di funzione che restituiscono l'elenco dei messaggi IDs non riusciti nel batch:
- .NET
-
- AWS SDK for .NET
-
C'è di più su GitHub. Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.
Segnalazione di errori di elementi batch di Kinesis con Lambda tramite .NET.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text;
using System.Text.Json.Serialization;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace KinesisIntegration;
public class Function
{
// Powertools Logger requires an environment variables against your function
// POWERTOOLS_SERVICE_NAME
[Logging(LogEvent = true)]
public async Task<StreamsEventResponse> FunctionHandler(KinesisEvent evnt, ILambdaContext context)
{
if (evnt.Records.Count == 0)
{
Logger.LogInformation("Empty Kinesis Event received");
return new StreamsEventResponse();
}
foreach (var record in evnt.Records)
{
try
{
Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
string data = await GetRecordDataAsync(record.Kinesis, context);
Logger.LogInformation($"Data: {data}");
// TODO: Do interesting work based on the new data
}
catch (Exception ex)
{
Logger.LogError($"An error occurred {ex.Message}");
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
return new StreamsEventResponse
{
BatchItemFailures = new List<StreamsEventResponse.BatchItemFailure>
{
new StreamsEventResponse.BatchItemFailure { ItemIdentifier = record.Kinesis.SequenceNumber }
}
};
}
}
Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
return new StreamsEventResponse();
}
private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
{
byte[] bytes = record.Data.ToArray();
string data = Encoding.UTF8.GetString(bytes);
await Task.CompletedTask; //Placeholder for actual async work
return data;
}
}
public class StreamsEventResponse
{
[JsonPropertyName("batchItemFailures")]
public IList<BatchItemFailure> BatchItemFailures { get; set; }
public class BatchItemFailure
{
[JsonPropertyName("itemIdentifier")]
public string ItemIdentifier { get; set; }
}
}
- Go
-
- SDK per Go V2
-
C'è dell'altro GitHub. Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.
Segnalazione di errori di elementi batch di Kinesis con Lambda tramite Go.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"fmt"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
)
func handler(ctx context.Context, kinesisEvent events.KinesisEvent) (map[string]interface{}, error) {
batchItemFailures := []map[string]interface{}{}
for _, record := range kinesisEvent.Records {
curRecordSequenceNumber := ""
// Process your record
if /* Your record processing condition here */ {
curRecordSequenceNumber = record.Kinesis.SequenceNumber
}
// Add a condition to check if the record processing failed
if curRecordSequenceNumber != "" {
batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": curRecordSequenceNumber})
}
}
kinesisBatchResponse := map[string]interface{}{
"batchItemFailures": batchItemFailures,
}
return kinesisBatchResponse, nil
}
func main() {
lambda.Start(handler)
}
- Java
-
- SDK per Java 2.x
-
C'è dell'altro GitHub. Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.
Segnalazione di errori di elementi batch di Kinesis con Lambda tramite Java.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, StreamsEventResponse> {
@Override
public StreamsEventResponse handleRequest(KinesisEvent input, Context context) {
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
String curRecordSequenceNumber = "";
for (KinesisEvent.KinesisEventRecord kinesisEventRecord : input.getRecords()) {
try {
//Process your record
KinesisEvent.Record kinesisRecord = kinesisEventRecord.getKinesis();
curRecordSequenceNumber = kinesisRecord.getSequenceNumber();
} catch (Exception e) {
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
return new StreamsEventResponse(batchItemFailures);
}
}
return new StreamsEventResponse(batchItemFailures);
}
}
- JavaScript
-
- SDK per JavaScript (v3)
-
C'è altro da fare. GitHub Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.
Segnalazione di errori di elementi batch di Kinesis con Lambda tramite Javascript.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
for (const record of event.Records) {
try {
console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
console.log(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {
console.error(`An error occurred ${err}`);
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
return {
batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }],
};
}
}
console.log(`Successfully processed ${event.Records.length} records.`);
return { batchItemFailures: [] };
};
async function getRecordDataAsync(payload) {
var data = Buffer.from(payload.data, "base64").toString("utf-8");
await Promise.resolve(1); //Placeholder for actual async work
return data;
}
Segnalazione di errori relativi agli elementi batch di Kinesis utilizzando Lambda. TypeScript
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
KinesisStreamEvent,
Context,
KinesisStreamHandler,
KinesisStreamRecordPayload,
KinesisStreamBatchResponse,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";
const logger = new Logger({
logLevel: "INFO",
serviceName: "kinesis-stream-handler-sample",
});
export const functionHandler: KinesisStreamHandler = async (
event: KinesisStreamEvent,
context: Context
): Promise<KinesisStreamBatchResponse> => {
for (const record of event.Records) {
try {
logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
logger.info(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {
logger.error(`An error occurred ${err}`);
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
return {
batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }],
};
}
}
logger.info(`Successfully processed ${event.Records.length} records.`);
return { batchItemFailures: [] };
};
async function getRecordDataAsync(
payload: KinesisStreamRecordPayload
): Promise<string> {
var data = Buffer.from(payload.data, "base64").toString("utf-8");
await Promise.resolve(1); //Placeholder for actual async work
return data;
}
- PHP
-
- SDK per PHP
-
C'è altro da fare. GitHub Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.
Segnalazione di errori di elementi batch di Kinesis con Lambda tramite PHP.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php
# using bref/bref and bref/logger for simplicity
use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;
require __DIR__ . '/vendor/autoload.php';
class Handler implements StdHandler
{
private StderrLogger $logger;
public function __construct(StderrLogger $logger)
{
$this->logger = $logger;
}
/**
* @throws JsonException
* @throws \Bref\Event\InvalidLambdaEvent
*/
public function handle(mixed $event, Context $context): array
{
$kinesisEvent = new KinesisEvent($event);
$this->logger->info("Processing records");
$records = $kinesisEvent->getRecords();
$failedRecords = [];
foreach ($records as $record) {
try {
$data = $record->getData();
$this->logger->info(json_encode($data));
// TODO: Do interesting work based on the new data
} catch (Exception $e) {
$this->logger->error($e->getMessage());
// failed processing the record
$failedRecords[] = $record->getSequenceNumber();
}
}
$totalRecords = count($records);
$this->logger->info("Successfully processed $totalRecords records");
// change format for the response
$failures = array_map(
fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber],
$failedRecords
);
return [
'batchItemFailures' => $failures
];
}
}
$logger = new StderrLogger();
return new Handler($logger);
- Python
-
- SDK per Python (Boto3)
-
C'è dell'altro GitHub. Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.
Segnalazione di errori di elementi batch di Kinesis con Lambda tramite Python.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def handler(event, context):
records = event.get("Records")
curRecordSequenceNumber = ""
for record in records:
try:
# Process your record
curRecordSequenceNumber = record["kinesis"]["sequenceNumber"]
except Exception as e:
# Return failed record's sequence number
return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]}
return {"batchItemFailures":[]}
- Ruby
-
- SDK per Ruby
-
C'è dell'altro GitHub. Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.
Segnalazione di errori di elementi batch di Kinesis con Lambda tramite Ruby.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'
def lambda_handler(event:, context:)
batch_item_failures = []
event['Records'].each do |record|
begin
puts "Processed Kinesis Event - EventID: #{record['eventID']}"
record_data = get_record_data_async(record['kinesis'])
puts "Record Data: #{record_data}"
# TODO: Do interesting work based on the new data
rescue StandardError => err
puts "An error occurred #{err}"
# Since we are working with streams, we can return the failed item immediately.
# Lambda will immediately begin to retry processing from this failed item onwards.
return { batchItemFailures: [{ itemIdentifier: record['kinesis']['sequenceNumber'] }] }
end
end
puts "Successfully processed #{event['Records'].length} records."
{ batchItemFailures: batch_item_failures }
end
def get_record_data_async(payload)
data = Base64.decode64(payload['data']).force_encoding('utf-8')
# Placeholder for actual async work
sleep(1)
data
end
- Rust
-
- SDK per Rust
-
C'è dell'altro GitHub. Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.
Segnalazione di errori di elementi batch di Kinesis con Lambda tramite Rust.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::{
event::kinesis::KinesisEvent,
kinesis::KinesisEventRecord,
streams::{KinesisBatchItemFailure, KinesisEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<KinesisEventResponse, Error> {
let mut response = KinesisEventResponse {
batch_item_failures: vec![],
};
if event.payload.records.is_empty() {
tracing::info!("No records found. Exiting.");
return Ok(response);
}
for record in &event.payload.records {
tracing::info!(
"EventId: {}",
record.event_id.as_deref().unwrap_or_default()
);
let record_processing_result = process_record(record);
if record_processing_result.is_err() {
response.batch_item_failures.push(KinesisBatchItemFailure {
item_identifier: record.kinesis.sequence_number.clone(),
});
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
return Ok(response);
}
}
tracing::info!(
"Successfully processed {} records",
event.payload.records.len()
);
Ok(response)
}
fn process_record(record: &KinesisEventRecord) -> Result<(), Error> {
let record_data = std::str::from_utf8(record.kinesis.data.as_slice());
if let Some(err) = record_data.err() {
tracing::error!("Error: {}", err);
return Err(Error::from(err));
}
let record_data = record_data.unwrap_or_default();
// do something interesting with the data
tracing::info!("Data: {}", record_data);
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
run(service_fn(function_handler)).await
}