DynamoDB と Lambda を使用した部分的なバッチレスポンスの設定
イベントソースからストリーミングデータを使用および処理する場合、デフォルトでは、バッチが完全に成功した場合にのみ、バッチの最大シーケンス番号に Lambda チェックポイントが設定されます。Lambda は、他のすべての結果を完全な失敗として扱い、再試行の上限までバッチの処理を再試行します。ストリームからのバッチの処理中に部分的な成功を許可するには、ReportBatchItemFailures
をオンにします 。部分的な成功を許可すると、レコードの再試行回数を減らすことができますが、成功したレコードの再試行の可能性を完全に妨げるわけではありません。
ReportBatchItemFailures
をオンにするには、列挙値 ReportBatchItemFailures
を FunctionResponseTypes リストに含めます。このリストは、関数で有効になっているレスポンスタイプを示します。このリストは、イベントソースマッピングを作成または更新するときに設定できます。
レポートの構文
バッチアイテムの失敗に関するレポートを設定する場合、StreamsEventResponse
クラスはバッチアイテムの失敗のリストとともに返されます。StreamsEventResponse
オブジェクトを使用して、バッチ処理で最初に失敗したレコードのシーケンス番号を返すことができます。また、正しいレスポンスシンタックスを使用して、独自のカスタムクラスを作成することもできます。次の JSON 構造体は、必要な応答構文を示しています。
{ "batchItemFailures": [ { "itemIdentifier": "<SequenceNumber>" } ] }
注記
batchItemFailures
配列に複数の項目が含まれている場合、Lambda はシーケンス番号が最も小さいレコードをチェックポイントとして使用します。その後、Lambda はそのチェックポイントからすべてのレコードを再試行します。
成功条件と失敗の条件
次のいずれかを返すと、Lambda はバッチを完全な成功として処理します:
-
空の
batchItemFailure
リストです。 -
null の
batchItemFailure
リスト -
空の
EventResponse
-
ヌル
EventResponse
次のいずれかを返すと、Lambda はバッチを完全な失敗として処理します:
-
空の文字列
itemIdentifier
-
ヌル
itemIdentifier
-
itemIdentifier
間違えているキー名
Lambda は、再試行戦略に基づいて失敗を再試行します。
バッチを2分割します
呼び出しが失敗し、BisectBatchOnFunctionError
オンになっている場合、バッチはReportBatchItemFailures
設定に関係なく2分割されます。
部分的なバッチ成功レスポンスを受信し、BisectBatchOnFunctionError
と ReportBatchItemFailures
の両方がオンになっている場合、バッチは返されたシーケンス番号で 2 分割され、Lambda は残りのレコードのみを再試行します。
バッチで失敗したメッセージ ID のリストを返す関数コードの例を次に示します。
- .NET
-
- AWS SDK for .NET
-
注記
GitHub には、その他のリソースもあります。サーバーレスサンプル
リポジトリで完全な例を検索し、設定および実行の方法を確認してください。 .NET を使用して Lambda で DynamoDB のバッチアイテム失敗のレポート。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text.Json; using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.DynamoDBEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace AWSLambda_DDB; public class Function { public StreamsEventResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context) { context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records..."); List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new List<StreamsEventResponse.BatchItemFailure>(); StreamsEventResponse streamsEventResponse = new StreamsEventResponse(); foreach (var record in dynamoEvent.Records) { try { var sequenceNumber = record.Dynamodb.SequenceNumber; context.Logger.LogInformation(sequenceNumber); } catch (Exception ex) { context.Logger.LogError(ex.Message); batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = record.Dynamodb.SequenceNumber }); } } if (batchItemFailures.Count > 0) { streamsEventResponse.BatchItemFailures = batchItemFailures; } context.Logger.LogInformation("Stream processing complete."); return streamsEventResponse; } }
- Go
-
- SDK for Go V2
-
注記
GitHub には、その他のリソースもあります。サーバーレスサンプル
リポジトリで完全な例を検索し、設定および実行の方法を確認してください。 Go を使用して Lambda で DynamoDB のバッチアイテム失敗のレポート。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) type BatchItemFailure struct { ItemIdentifier string `json:"ItemIdentifier"` } type BatchResult struct { BatchItemFailures []BatchItemFailure `json:"BatchItemFailures"` } func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*BatchResult, error) { var batchItemFailures []BatchItemFailure curRecordSequenceNumber := "" for _, record := range event.Records { // Process your record curRecordSequenceNumber = record.Change.SequenceNumber } if curRecordSequenceNumber != "" { batchItemFailures = append(batchItemFailures, BatchItemFailure{ItemIdentifier: curRecordSequenceNumber}) } batchResult := BatchResult{ BatchItemFailures: batchItemFailures, } return &batchResult, nil } func main() { lambda.Start(HandleRequest) }
- Java
-
- SDK for Java 2.x
-
注記
GitHub には、その他のリソースもあります。サーバーレスサンプル
リポジトリで完全な例を検索し、設定および実行の方法を確認してください。 Java を使用して Lambda で DynamoDB のバッチアイテム失敗のレポート。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, Serializable> { @Override public StreamsEventResponse handleRequest(DynamodbEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>(); String curRecordSequenceNumber = ""; for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) { try { //Process your record StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb(); curRecordSequenceNumber = dynamodbRecord.getSequenceNumber(); } catch (Exception e) { /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(); } }
- JavaScript
-
- SDK for JavaScript (v3)
-
注記
GitHub には、その他のリソースもあります。サーバーレスサンプル
リポジトリで完全な例を検索し、設定および実行の方法を確認してください。 JavaScript を使用して Lambda で DynamoDB のバッチアイテム失敗のレポート。
export const handler = async (event) => { const records = event.Records; let curRecordSequenceNumber = ""; for (const record of records) { try { // Process your record curRecordSequenceNumber = record.dynamodb.SequenceNumber; } catch (e) { // Return failed record's sequence number return { batchItemFailures: [{ itemIdentifier: curRecordSequenceNumber }] }; } } return { batchItemFailures: [] }; };
TypeScript を使用して Lambda で DynamoDB のバッチアイテム失敗のレポート。
import { DynamoDBBatchResponse, DynamoDBBatchItemFailure, DynamoDBStreamEvent, } from "aws-lambda"; export const handler = async ( event: DynamoDBStreamEvent ): Promise<DynamoDBBatchResponse> => { const batchItemFailures: DynamoDBBatchItemFailure[] = []; let curRecordSequenceNumber; for (const record of event.Records) { curRecordSequenceNumber = record.dynamodb?.SequenceNumber; if (curRecordSequenceNumber) { batchItemFailures.push({ itemIdentifier: curRecordSequenceNumber, }); } } return { batchItemFailures: batchItemFailures }; };
- PHP
-
- SDK for PHP
-
注記
GitHub には、その他のリソースもあります。サーバーレスサンプル
リポジトリで完全な例を検索し、設定および実行の方法を確認してください。 PHP を使用した Lambda での DynamoDB バッチ項目失敗のレポート。
<?php # using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\DynamoDb\DynamoDbEvent; use Bref\Event\Handler as StdHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler implements StdHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handle(mixed $event, Context $context): array { $dynamoDbEvent = new DynamoDbEvent($event); $this->logger->info("Processing records"); $records = $dynamoDbEvent->getRecords(); $failedRecords = []; foreach ($records as $record) { try { $data = $record->getData(); $this->logger->info(json_encode($data)); // TODO: Do interesting work based on the new data } catch (Exception $e) { $this->logger->error($e->getMessage()); // failed processing the record $failedRecords[] = $record->getSequenceNumber(); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); // change format for the response $failures = array_map( fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber], $failedRecords ); return [ 'batchItemFailures' => $failures ]; } } $logger = new StderrLogger(); return new Handler($logger);
- Python
-
- SDK for Python (Boto3)
-
注記
GitHub には、その他のリソースもあります。サーバーレスサンプル
リポジトリで完全な例を検索し、設定および実行の方法を確認してください。 Python を使用して Lambda で DynamoDB のバッチアイテム失敗のレポート。
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def handler(event, context): records = event.get("Records") curRecordSequenceNumber = "" for record in records: try: # Process your record curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}
- Ruby
-
- SDK for Ruby
-
注記
GitHub には、その他のリソースもあります。サーバーレスサンプル
リポジトリで完全な例を検索し、設定および実行の方法を確認してください。 Ruby を使用して Lambda で DynamoDB のバッチアイテム失敗のレポート。
def lambda_handler(event:, context:) records = event["Records"] cur_record_sequence_number = "" records.each do |record| begin # Process your record cur_record_sequence_number = record["dynamodb"]["SequenceNumber"] rescue StandardError => e # Return failed record's sequence number return {"batchItemFailures" => [{"itemIdentifier" => cur_record_sequence_number}]} end end {"batchItemFailures" => []} end
- Rust
-
- SDK for Rust
-
注記
GitHub には、その他のリソースもあります。サーバーレスサンプル
リポジトリで完全な例を検索し、設定および実行の方法を確認してください。 Rust を使用して Lambda で DynamoDB のバッチアイテム失敗のレポート。
use aws_lambda_events::{ event::dynamodb::{Event, EventRecord, StreamRecord}, streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; /// Process the stream record fn process_record(record: &EventRecord) -> Result<(), Error> { let stream_record: &StreamRecord = &record.change; // process your stream record here... tracing::info!("Data: {:?}", stream_record); Ok(()) } /// Main Lambda handler here... async fn function_handler(event: LambdaEvent<Event>) -> Result<DynamoDbEventResponse, Error> { let mut response = DynamoDbEventResponse { batch_item_failures: vec![], }; let records = &event.payload.records; if records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(response); } for record in records { tracing::info!("EventId: {}", record.event_id); // Couldn't find a sequence number if record.change.sequence_number.is_none() { response.batch_item_failures.push(DynamoDbBatchItemFailure { item_identifier: Some("".to_string()), }); return Ok(response); } // Process your record here... if process_record(record).is_err() { response.batch_item_failures.push(DynamoDbBatchItemFailure { item_identifier: record.change.sequence_number.clone(), }); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return Ok(response); } } tracing::info!("Successfully processed {} record(s)", records.len()); Ok(response) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }