

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 DynamoDB 和 Lambda 設定部分批次回應
<a name="services-ddb-batchfailurereporting"></a>

取用和處理事件來源的串流資料時，依預設，只有在批次成功完成時，Lambda 檢查點才會到批次的最高序號。Lambda 會將所有其他結果視為完全失敗，並重試處理批次，直至達到重試限制。若要在處理串流的批次時允許部分成功，請開啟 `ReportBatchItemFailures`。允許部分成功有助於減少記錄的重試次數，但其不會完全消除在成功記錄中重試的可能性。

若要開啟 `ReportBatchItemFailures`，請在 [FunctionResponseTypes](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionResponseTypes) 清單中包含枚舉值 **ReportBatchItemFailures**。此清單指示已為您的函數啟用哪些回應類型。您可以在[建立](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html)或[更新](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html)事件來源映射時設定此清單。

**注意**  
即使函式程式碼傳回了部分批次失敗回應，除非針對事件來源映射明確啟用 `ReportBatchItemFailures` 功能，否則 Lambda 將不會處理這些回應。

## 報告語法
<a name="streams-batchfailurereporting-syntax"></a>

設定批次項目失敗的報告時，會傳回 `StreamsEventResponse` 類別，其中包含批次項目失敗的清單。您可以使用 `StreamsEventResponse` 物件，來傳回批次中第一個失敗記錄的序號。您還可以使用正確的回應語法，建立自己的自訂類別。下列 JSON 結構顯示所需的回應語法：

```
{ 
  "batchItemFailures": [ 
        {
            "itemIdentifier": "<SequenceNumber>"
        }
    ]
}
```

**注意**  
如果 `batchItemFailures` 陣列包含多個項目，則 Lambda 會使用具有最低序列號的記錄作為檢查點。然後，Lambda 會重試從該檢查點開始的所有記錄。

## 成功與失敗條件
<a name="streams-batchfailurereporting-conditions"></a>

如果您傳回下列任一項目，Lambda 會將批次視為完全成功：
+ 空白 `batchItemFailure` 清單
+ Null `batchItemFailure` 清單
+ 空白 `EventResponse`
+ Null `EventResponse`

如果您傳回下列任一項目，Lambda 會將批次視為完全失敗：
+ 空白字串 `itemIdentifier`
+ Null `itemIdentifier`
+ 具有錯誤金鑰名稱的 `itemIdentifier`

Lambda 會根據您的重試政策來重試失敗。

## 將批次平分
<a name="streams-batchfailurereporting-bisect"></a>

如果您的調用失敗且 `BisectBatchOnFunctionError` 已開啟，則無論您的 `ReportBatchItemFailures` 設定如何，批次都會被平分。

收到部分批次成功回應且 `BisectBatchOnFunctionError` 和 `ReportBatchItemFailures` 均開啟時，批次會依傳回的序號進行平分，並且 Lambda 僅會重試剩餘的記錄。

為了簡化部分批次回應邏輯的實作，請考慮使用 Powertools 中的 [Batch Processor 公用程式](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/) AWS Lambda，其會自動為您處理這些複雜性。

以下範例函數程式碼會傳回批次中失敗訊息 ID 的清單：

------
#### [ .NET ]

**適用於 .NET 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)儲存庫中設定和執行。
使用 .NET 搭配 Lambda 報告 DynamoDB 批次項目失敗。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace AWSLambda_DDB;

public class Function
{
    public StreamsEventResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)

    {
        context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");
        List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new List<StreamsEventResponse.BatchItemFailure>();
        StreamsEventResponse streamsEventResponse = new StreamsEventResponse();

        foreach (var record in dynamoEvent.Records)
        {
            try
            {
                var sequenceNumber = record.Dynamodb.SequenceNumber;
                context.Logger.LogInformation(sequenceNumber);
            }
            catch (Exception ex)
            {
                context.Logger.LogError(ex.Message);
                batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = record.Dynamodb.SequenceNumber });
            }
        }

        if (batchItemFailures.Count > 0)
        {
            streamsEventResponse.BatchItemFailures = batchItemFailures;
        }

        context.Logger.LogInformation("Stream processing complete.");
        return streamsEventResponse;
    }
}
```

------
#### [ Go ]

**SDK for Go V2**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)儲存庫中設定和執行。
使用 Go 搭配 Lambda 報告 DynamoDB 批次項目失敗。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

type BatchItemFailure struct {
	ItemIdentifier string `json:"ItemIdentifier"`
}

type BatchResult struct {
	BatchItemFailures []BatchItemFailure `json:"BatchItemFailures"`
}

func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*BatchResult, error) {
	var batchItemFailures []BatchItemFailure
	curRecordSequenceNumber := ""

	for _, record := range event.Records {
		// Process your record
		curRecordSequenceNumber = record.Change.SequenceNumber
	}

	if curRecordSequenceNumber != "" {
		batchItemFailures = append(batchItemFailures, BatchItemFailure{ItemIdentifier: curRecordSequenceNumber})
	}
	
	batchResult := BatchResult{
		BatchItemFailures: batchItemFailures,
	}

	return &batchResult, nil
}

func main() {
	lambda.Start(HandleRequest)
}
```

------
#### [ Java ]

**適用於 Java 2.x 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)儲存庫中設定和執行。
使用 Java 搭配 Lambda 報告 DynamoDB 批次項目失敗。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord;

import java.util.ArrayList;
import java.util.List;

public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, StreamsEventResponse> {

    @Override
    public StreamsEventResponse handleRequest(DynamodbEvent input, Context context) {

        List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
        String curRecordSequenceNumber = "";

        for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) {
          try {
                //Process your record
                StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb();
                curRecordSequenceNumber = dynamodbRecord.getSequenceNumber();
                
            } catch (Exception e) {
                /* Since we are working with streams, we can return the failed item immediately.
                   Lambda will immediately begin to retry processing from this failed item onwards. */
                batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
                return new StreamsEventResponse(batchItemFailures);
            }
        }
       
       return new StreamsEventResponse();   
    }
}
```

------
#### [ JavaScript ]

**適用於 JavaScript (v3) 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)儲存庫中設定和執行。
使用 JavaScript 搭配 Lambda 報告 DynamoDB 批次項目失敗。  

```
export const handler = async (event) => {
  const records = event.Records;
  let curRecordSequenceNumber = "";

  for (const record of records) {
    try {
      // Process your record
      curRecordSequenceNumber = record.dynamodb.SequenceNumber;
    } catch (e) {
      // Return failed record's sequence number
      return { batchItemFailures: [{ itemIdentifier: curRecordSequenceNumber }] };
    }
  }

  return { batchItemFailures: [] };
};
```
使用 TypeScript 搭配 Lambda 報告 DynamoDB 批次項目失敗。  

```
import {
  DynamoDBBatchResponse,
  DynamoDBBatchItemFailure,
  DynamoDBStreamEvent,
} from "aws-lambda";

export const handler = async (
  event: DynamoDBStreamEvent
): Promise<DynamoDBBatchResponse> => {
  const batchItemFailures: DynamoDBBatchItemFailure[] = [];
  let curRecordSequenceNumber;

  for (const record of event.Records) {
    curRecordSequenceNumber = record.dynamodb?.SequenceNumber;

    if (curRecordSequenceNumber) {
      batchItemFailures.push({
        itemIdentifier: curRecordSequenceNumber,
      });
    }
  }

  return { batchItemFailures: batchItemFailures };
};
```

------
#### [ PHP ]

**適用於 PHP 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)儲存庫中設定和執行。
使用 PHP 搭配 Lambda 報告 DynamoDB 批次項目失敗。  

```
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler implements StdHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handle(mixed $event, Context $context): array
    {
        $dynamoDbEvent = new DynamoDbEvent($event);
        $this->logger->info("Processing records");

        $records = $dynamoDbEvent->getRecords();
        $failedRecords = [];
        foreach ($records as $record) {
            try {
                $data = $record->getData();
                $this->logger->info(json_encode($data));
                // TODO: Do interesting work based on the new data
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
                // failed processing the record
                $failedRecords[] = $record->getSequenceNumber();
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");

        // change format for the response
        $failures = array_map(
            fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber],
            $failedRecords
        );

        return [
            'batchItemFailures' => $failures
        ];
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**適用於 Python (Boto3) 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)儲存庫中設定和執行。
使用 Python 搭配 Lambda 報告 DynamoDB 批次項目失敗。  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def handler(event, context):
    records = event.get("Records")
    curRecordSequenceNumber = ""
    
    for record in records:
        try:
            # Process your record
            curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"]
        except Exception as e:
            # Return failed record's sequence number
            return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]}

    return {"batchItemFailures":[]}
```

------
#### [ Ruby ]

**SDK for Ruby**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)儲存庫中設定和執行。
使用 Ruby 搭配 Lambda 報告 DynamoDB 批次項目失敗。  

```
def lambda_handler(event:, context:)
    records = event["Records"]
    cur_record_sequence_number = ""
  
    records.each do |record|
      begin
        # Process your record
        cur_record_sequence_number = record["dynamodb"]["SequenceNumber"]
      rescue StandardError => e
        # Return failed record's sequence number
        return {"batchItemFailures" => [{"itemIdentifier" => cur_record_sequence_number}]}
      end
    end
  
    {"batchItemFailures" => []}
  end
```

------
#### [ Rust ]

**適用於 Rust 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)儲存庫中設定和執行。
使用 Rust 搭配 Lambda 報告 DynamoDB 批次項目失敗。  

```
use aws_lambda_events::{
    event::dynamodb::{Event, EventRecord, StreamRecord},
    streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

/// Process the stream record
fn process_record(record: &EventRecord) -> Result<(), Error> {
    let stream_record: &StreamRecord = &record.change;

    // process your stream record here...
    tracing::info!("Data: {:?}", stream_record);

    Ok(())
}

/// Main Lambda handler here...
async fn function_handler(event: LambdaEvent<Event>) -> Result<DynamoDbEventResponse, Error> {
    let mut response = DynamoDbEventResponse {
        batch_item_failures: vec![],
    };

    let records = &event.payload.records;

    if records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(response);
    }

    for record in records {
        tracing::info!("EventId: {}", record.event_id);

        // Couldn't find a sequence number
        if record.change.sequence_number.is_none() {
            response.batch_item_failures.push(DynamoDbBatchItemFailure {
                item_identifier: Some("".to_string()),
            });
            return Ok(response);
        }

        // Process your record here...
        if process_record(record).is_err() {
            response.batch_item_failures.push(DynamoDbBatchItemFailure {
                item_identifier: record.change.sequence_number.clone(),
            });
            /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
            return Ok(response);
        }
    }

    tracing::info!("Successfully processed {} record(s)", records.len());

    Ok(response)
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

## 使用 Powertools 進行 AWS Lambda 批次處理器
<a name="services-ddb-batchfailurereporting-powertools"></a>

Powertools for 的批次處理器公用程式 AWS Lambda 會自動處理部分批次回應邏輯，降低實作批次失敗報告的複雜性。以下是使用批次處理器的範例：

**Python**  
如需完整的範例和設定說明，請參閱 [batch processor documentation](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)。
使用 AWS Lambda 批次處理器處理 DynamoDB 串流記錄。  

```
import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes import DynamoDBStreamEvent
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
logger = Logger()

def record_handler(record):
    logger.info(record)
    # Your business logic here
    # Raise an exception to mark this record as failed
    
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(
        event=event, 
        record_handler=record_handler, 
        processor=processor,
        context=context
    )
```

**TypeScript**  
如需完整的範例和設定說明，請參閱 [batch processor documentation](https://docs.aws.amazon.com/powertools/typescript/latest/features/batch/)。
使用 AWS Lambda 批次處理器處理 DynamoDB 串流記錄。  

```
import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { DynamoDBStreamEvent, Context } from 'aws-lambda';

const processor = new BatchProcessor(EventType.DynamoDBStreams);
const logger = new Logger();

const recordHandler = async (record: any): Promise<void> => {
    logger.info('Processing record', { record });
    // Your business logic here
    // Throw an error to mark this record as failed
};

export const handler = async (event: DynamoDBStreamEvent, context: Context) => {
    return processPartialResponse(event, recordHandler, processor, {
        context,
    });
};
```

**Java**  
如需完整的範例和設定說明，請參閱 [batch processor documentation](https://docs.powertools.aws.dev/lambda/java/latest/utilities/batch/)。
使用 AWS Lambda 批次處理器處理 DynamoDB 串流記錄。  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;

public class DynamoDBStreamBatchHandler implements RequestHandler<DynamodbEvent, StreamsEventResponse> {

    private final BatchMessageHandler<DynamodbEvent, StreamsEventResponse> handler;

    public DynamoDBStreamBatchHandler() {
        handler = new BatchMessageHandlerBuilder()
                .withDynamoDbBatchHandler()
                .buildWithRawMessageHandler(this::processMessage);
    }

    @Override
    public StreamsEventResponse handleRequest(DynamodbEvent ddbEvent, Context context) {
        return handler.processBatch(ddbEvent, context);
    }

    private void processMessage(DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord, Context context) {
        // Process the change record
    }
}
```

**.NET**  
如需完整的範例和設定說明，請參閱 [batch processor documentation](https://docs.aws.amazon.com/powertools/dotnet/utilities/batch-processing/)。
使用 AWS Lambda 批次處理器處理 DynamoDB 串流記錄。  

```
using System;
using System.Threading;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
using Amazon.Lambda.Serialization.SystemTextJson;
using AWS.Lambda.Powertools.BatchProcessing;

[assembly: LambdaSerializer(typeof(DefaultLambdaJsonSerializer))]

namespace HelloWorld;

public class Customer
{
    public string? CustomerId { get; set; }
    public string? Name { get; set; }
    public string? Email { get; set; }
    public DateTime CreatedAt { get; set; }
}

internal class TypedDynamoDbRecordHandler : ITypedRecordHandler<Customer> 
{
    public async Task<RecordHandlerResult> HandleAsync(Customer customer, CancellationToken cancellationToken)
    {
        if (string.IsNullOrEmpty(customer.Email)) 
        {
            throw new ArgumentException("Customer email is required");
        }

        return await Task.FromResult(RecordHandlerResult.None); 
    }
}

public class Function
{
    [BatchProcessor(TypedRecordHandler = typeof(TypedDynamoDbRecordHandler))]
    public BatchItemFailuresResponse HandlerUsingTypedAttribute(DynamoDBEvent _)
    {
        return TypedDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; 
    }
}
```