

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# AWS Lambda 搭配 Amazon DynamoDB 使用
<a name="with-ddb"></a>

**注意**  
如要將資料傳送到 Lambda 函數以外的目標，或在傳送資料之前讓資料更豐富，請參閱 [Amazon EventBridge Pipes](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html)。

您可以使用 AWS Lambda 函數來處理 [Amazon DynamoDB 串流](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html)中的記錄。您可以透過 DynamoDB Streams，以在每次更新 DynamoDB 資料表時，觸發 Lambda 函數來執行額外的工作。

處理 DynamoDB 串流時，您需要實作部分批次回應邏輯，避免批次中部分記錄處理失敗時，已成功處理的記錄被重新執行。適用於 的 Powertools 的 [Batch Processor 公用程式](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/) AWS Lambda 可在 Python、TypeScript、.NET 和 Java 中使用，並透過自動處理部分批次回應邏輯、縮短開發時間並改善可靠性來簡化此實作。

**Topics**
+ [輪詢和批次處理串流](#dynamodb-polling-and-batching)
+ [輪詢和串流開始位置](#dyanmo-db-stream-poll)
+ [DynamoDB Streams 中的碎片同時讀取](#events-dynamodb-simultaneous-readers)
+ [範例事件](#events-sample-dynamodb)
+ [使用 Lambda 處理 DynamoDB 記錄](services-dynamodb-eventsourcemapping.md)
+ [使用 DynamoDB 和 Lambda 設定部分批次回應](services-ddb-batchfailurereporting.md)
+ [在 Lambda 中保留 DynamoDB 事件來源的捨棄記錄](services-dynamodb-errors.md)
+ [在 Lambda 中實作有狀態的 DynamoDB 串流處理](services-ddb-windows.md)
+ [Amazon DynamoDB 事件來源映射的 Lambda 參數](services-ddb-params.md)
+ [搭配 DynamoDB 事件來源使用事件篩選](with-ddb-filtering.md)
+ [教學課程： AWS Lambda 搭配 Amazon DynamoDB 串流使用](with-ddb-example.md)

## 輪詢和批次處理串流
<a name="dynamodb-polling-and-batching"></a>

Lambda 會輪詢您 DynamoDB 串流中的碎片，其記錄的基本速率為每秒 4 次。當記錄可用時，Lambda 會調用您的函數，並等待結果。如果處理成功，Lambda 會恢復輪詢，直到收到多筆記錄。

Lambda 預設會在記錄可用時立即調用函數。如果 Lambda 從事件來源中讀取的批次只有一筆記錄，Lambda 只會傳送一筆記錄至函數。為避免調用具有少量記錄的函數，您可設定*批次間隔*，請求事件來源緩衝記錄最長達五分鐘。調用函數之前，Lambda 會繼續從事件來源中讀取記錄，直到收集到完整批次、批次間隔到期或者批次達到 6 MB 的承載限制。如需詳細資訊，請參閱[批次處理行為](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)。

**警告**  
Lambda 事件來源映射至少會處理每個事件一次，而且可能會重複處理記錄。為避免與重複事件相關的潛在問題，強烈建議您讓函數程式碼具有等冪性。若要進一步了解，請參閱 AWS 知識中心中的[如何使 Lambda 函數具有等冪性](https://repost.aws/knowledge-center/lambda-function-idempotent)。

Lambda 在傳送下一批進行處理前不會等待任何已設定的[擴充](lambda-extensions.md)完成。換句話說，當 Lambda 處理下一批記錄時，您的擴充功能可能會繼續執行。如果您違反任何帳戶的 [並行](lambda-concurrency.md) 設定或限制，便可能會產生限流的問題。若要偵測此是否為潛在問題，請監控您的函數，並確認您看到的 [並行指標](monitoring-concurrency.md#general-concurrency-metrics) 是否高於事件來源映射的預期值。由於兩次調用之間的時間很短，Lambda 可能會短暫報告比碎片數目更高的並行用量。即使對於沒有延伸項目的 Lambda 函數也可能如此。

設定 [ParallelizationFactor](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-ParallelizationFactor) 設定來同時處理 DynamoDB 資料串流的一個碎片與多個 Lambda 調用。您可以透過從 1 (預設) 到 10 的並行化因子指定 Lambda 從碎片輪詢的並行批次數。例如，當 `ParallelizationFactor` 設定為 2 時，您最多可以有 200 個並行 Lambda 調用，來處理 100 個 DynamoDB 串流碎片 (不過在實務中，`ConcurrentExecutions` 指標可能有不同值)。當資料量急劇波動並且 [IteratorAge](monitoring-metrics-types.md#performance-metrics) 較高時，這有助於向上擴展處理輸送量。如果增加每個碎片的並行批次數量，Lambda 仍會確保在項目 (分割區和排序索引鍵) 層級進行依序處理。

## 輪詢和串流開始位置
<a name="dyanmo-db-stream-poll"></a>

請注意，建立和更新事件來源映射期間的串流輪詢最終會一致。
+ 在建立事件來源映射期間，從串流開始輪詢事件可能需要幾分鐘時間。
+ 在更新事件來源映射期間，從串流停止並重新開始輪詢事件可能需要幾分鐘時間。

這種行為表示如果您指定 `LATEST` 當作串流的開始位置，事件來源映射可能會在建立或更新期間遺漏事件。若要確保沒有遺漏任何事件，請將串流開始位置指定為 `TRIM_HORIZON`。

## DynamoDB Streams 中的碎片同時讀取
<a name="events-dynamodb-simultaneous-readers"></a>

對於不是全域資料表的單一區域資料表，您最多可以設計兩個 Lambda 函數，以便同時讀取同一個 DynamoDB Streams 碎片。超過此限制會導致請求限流。對於全域資料表，建議您將同時函數的數量限制為一個，以避免請求限流。

## 範例事件
<a name="events-sample-dynamodb"></a>

**Example**  

```
{
  "Records": [
    {
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "NewImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES",
        "SequenceNumber": "111",
        "SizeBytes": 26
      },
      "awsRegion": "us-west-2",
      "eventName": "INSERT",
      "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525",
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "2",
      "eventVersion": "1.0",
      "dynamodb": {
        "OldImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "SequenceNumber": "222",
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "SizeBytes": 59,
        "NewImage": {
          "Message": {
            "S": "This item has changed"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "awsRegion": "us-west-2",
      "eventName": "MODIFY",
      "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525",
      "eventSource": "aws:dynamodb"
    }
  ]}
```

# 使用 Lambda 處理 DynamoDB 記錄
<a name="services-dynamodb-eventsourcemapping"></a>

建立事件來源映射，指示 Lambda 從您的串流傳送記錄至 Lambda 函數。您可以建立多個事件來源映射，來使用多個 Lambda 函數處理相同資料，或使用單一函數處理來自多個串流的項目。

您可以設定事件來源映射，以處理來自不同 AWS 帳戶中的串流的記錄。如需詳細資訊，請參閱 [建立跨帳戶事件來源映射](#services-dynamodb-eventsourcemapping-cross-account)。

若要將函數設定為從 DynamoDB Streams 讀取，請將 [AWSLambdaDynamoDBExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaDynamoDBExecutionRole.html) AWS 受管政策連接至您的執行角色，然後建立 **DynamoDB** 觸發。

**若要新增許可並建立觸發條件**

1. 開啟 Lambda 主控台中的 [函數頁面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 選擇函數的名稱。

1. 依序選擇 **Configuration** (組態) 索引標籤和 **Permissions** (許可)。

1. 在**角色名稱**下面，選擇執行角色連結。此連結會在 IAM 主控台中開啟該角色。  
![\[\]](http://docs.aws.amazon.com/zh_tw/lambda/latest/dg/images/execution-role.png)

1. 選擇**新增許可**，然後選擇**連接政策**。  
![\[\]](http://docs.aws.amazon.com/zh_tw/lambda/latest/dg/images/attach-policies.png)

1. 在搜尋欄位中輸入 `AWSLambdaDynamoDBExecutionRole`。將此政策新增至您的執行角色。這是 AWS 受管政策，其中包含函數從 DynamoDB 串流讀取所需的許可。如需此政策的詳細資訊，請參閱《AWS 受管政策參考》**中的 [AWSLambdaDynamoDBExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaDynamoDBExecutionRole.html)。

1. 在 Lambda 主控台中返回您的 Lambda 函數。在**函數概觀**下，選擇**新增觸發條件**。  
![\[\]](http://docs.aws.amazon.com/zh_tw/lambda/latest/dg/images/add-trigger.png)

1. 選擇觸發條件類型。

1. 設定需要的選項，然後選擇**新增**。

Lambda 支援 DynamoDB 事件來源的下列選項：

**事件來源選項**
+ **DynamoDB 資料表** - 從中讀取記錄的 DynamoDB 資料表。
+ **批次大小** – 每個批次中要傳送至函數的記錄數量，最高為 10,000。Lambda 會將批次中所有記錄以單一呼叫傳送至函數，前提是事件的總大小不超過同步調用的[酬載限制](gettingstarted-limits.md) (6 MB)。
+ **批次間隔** - 指定調用函數前收集記錄的最長時間 (秒)。
+ **開始位置** - 只處理新記錄，或所有現有的記錄。
  + **最新** - 處理已新增到串流的記錄。
  + **水平修剪** - 處理所有在串流中的記錄。

  處理任何現有的記錄後，該函式已跟上進度並持續處理新的記錄。
+ **故障目的地** - 用於無法處理之記錄的標準 SQS 佇列或標準 SNS 主題。當 Lambda 捨棄太舊或已耗盡所有重試的一批記錄時，Lambda 會將該批次的詳細資料傳送至此佇列或主題。
+ **重試嘗試** - 當函數傳回錯誤時，Lambda 重試的次數上限。這不適用於服務錯誤或調節，其中批次並沒有到達函數。
+ **記錄最大存留期** - Lambda 傳送至函數之記錄的最大存留期。
+ **在錯誤時分割批次** - 當函數傳回錯誤時，先將批次分割為兩個，再進行重試。您原始的批次大小設定仍會維持不變。
+ **每個碎片的並行批次** - 同時處理來自同一個碎片的多個批次。
+ **已啟用** - 設定為 true 可啟用事件來源映射。設定為 false 以停止處理記錄。Lambda 會追蹤上次處理的進度，並在重新啟用映射時從該時間點恢復處理。

**注意**  
對於由 DynamoDB 觸發條件中的 Lambda 調用的 GetRecords API 呼叫，您不需要付費。

若要稍後管理事件來源的組態，請選擇設計工具中的觸發。

## 建立跨帳戶事件來源映射
<a name="services-dynamodb-eventsourcemapping-cross-account"></a>

Amazon DynamoDB 現在支援以[資源為基礎的政策](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/access-control-resource-based.html)。使用此功能，您可以在另一個帳戶中 AWS 帳戶 使用 Lambda 函數處理來自 DynamoDB 串流的資料。

若要使用不同 DynamoDB 串流為您的 Lambda 函數建立事件來源映射 AWS 帳戶，您必須使用資源型政策來設定串流，以授予 Lambda 函數讀取記錄的許可。若要了解如何設定串流以進行跨帳戶存取，請參閱《*Amazon DynamoDB 開發人員指南*》中的[使用跨帳戶 Lambda 函數共用存取權](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/rbac-cross-account-access.html#rbac-analyze-cross-account-lambda-access)。

使用為 Lambda 函數提供必要許可的資源型政策設定串流後，請使用跨帳戶串流 ARN 建立事件來源映射。您可以在跨帳戶 DynamoDB 主控台的資料表**匯出和串流**索引標籤下找到串流 ARN。

使用 Lambda 主控台時，請將串流 ARN 直接貼到事件來源映射建立頁面的 DynamoDB 資料表輸入欄位。

 **注意：**不支援跨區域觸發。

# 使用 DynamoDB 和 Lambda 設定部分批次回應
<a name="services-ddb-batchfailurereporting"></a>

取用和處理事件來源的串流資料時，依預設，只有在批次成功完成時，Lambda 檢查點才會到批次的最高序號。Lambda 會將所有其他結果視為完全失敗，並重試處理批次，直至達到重試限制。若要在處理串流的批次時允許部分成功，請開啟 `ReportBatchItemFailures`。允許部分成功有助於減少記錄的重試次數，但其不會完全消除在成功記錄中重試的可能性。

若要開啟 `ReportBatchItemFailures`，請在 [FunctionResponseTypes](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionResponseTypes) 清單中包含枚舉值 **ReportBatchItemFailures**。此清單指示已為您的函數啟用哪些回應類型。您可以在[建立](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html)或[更新](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html)事件來源映射時設定此清單。

**注意**  
即使函式程式碼傳回了部分批次失敗回應，除非針對事件來源映射明確啟用 `ReportBatchItemFailures` 功能，否則 Lambda 將不會處理這些回應。

## 報告語法
<a name="streams-batchfailurereporting-syntax"></a>

設定批次項目失敗的報告時，會傳回 `StreamsEventResponse` 類別，其中包含批次項目失敗的清單。您可以使用 `StreamsEventResponse` 物件，來傳回批次中第一個失敗記錄的序號。您還可以使用正確的回應語法，建立自己的自訂類別。下列 JSON 結構顯示所需的回應語法：

```
{ 
  "batchItemFailures": [ 
        {
            "itemIdentifier": "<SequenceNumber>"
        }
    ]
}
```

**注意**  
如果 `batchItemFailures` 陣列包含多個項目，則 Lambda 會使用具有最低序列號的記錄作為檢查點。然後，Lambda 會重試從該檢查點開始的所有記錄。

## 成功與失敗條件
<a name="streams-batchfailurereporting-conditions"></a>

如果您傳回下列任一項目，Lambda 會將批次視為完全成功：
+ 空白 `batchItemFailure` 清單
+ Null `batchItemFailure` 清單
+ 空白 `EventResponse`
+ Null `EventResponse`

如果您傳回下列任一項目，Lambda 會將批次視為完全失敗：
+ 空白字串 `itemIdentifier`
+ Null `itemIdentifier`
+ 具有錯誤金鑰名稱的 `itemIdentifier`

Lambda 會根據您的重試政策來重試失敗。

## 將批次平分
<a name="streams-batchfailurereporting-bisect"></a>

如果您的調用失敗且 `BisectBatchOnFunctionError` 已開啟，則無論您的 `ReportBatchItemFailures` 設定如何，批次都會被平分。

收到部分批次成功回應且 `BisectBatchOnFunctionError` 和 `ReportBatchItemFailures` 均開啟時，批次會依傳回的序號進行平分，並且 Lambda 僅會重試剩餘的記錄。

為了簡化部分批次回應邏輯的實作，請考慮使用 Powertools 中的 [Batch Processor 公用程式](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/) AWS Lambda，其會自動為您處理這些複雜性。

以下範例函數程式碼會傳回批次中失敗訊息 ID 的清單：

------
#### [ .NET ]

**適用於 .NET 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)儲存庫中設定和執行。
使用 .NET 搭配 Lambda 報告 DynamoDB 批次項目失敗。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace AWSLambda_DDB;

public class Function
{
    public StreamsEventResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)

    {
        context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");
        List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new List<StreamsEventResponse.BatchItemFailure>();
        StreamsEventResponse streamsEventResponse = new StreamsEventResponse();

        foreach (var record in dynamoEvent.Records)
        {
            try
            {
                var sequenceNumber = record.Dynamodb.SequenceNumber;
                context.Logger.LogInformation(sequenceNumber);
            }
            catch (Exception ex)
            {
                context.Logger.LogError(ex.Message);
                batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = record.Dynamodb.SequenceNumber });
            }
        }

        if (batchItemFailures.Count > 0)
        {
            streamsEventResponse.BatchItemFailures = batchItemFailures;
        }

        context.Logger.LogInformation("Stream processing complete.");
        return streamsEventResponse;
    }
}
```

------
#### [ Go ]

**SDK for Go V2**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)儲存庫中設定和執行。
使用 Go 搭配 Lambda 報告 DynamoDB 批次項目失敗。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

type BatchItemFailure struct {
	ItemIdentifier string `json:"ItemIdentifier"`
}

type BatchResult struct {
	BatchItemFailures []BatchItemFailure `json:"BatchItemFailures"`
}

func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*BatchResult, error) {
	var batchItemFailures []BatchItemFailure
	curRecordSequenceNumber := ""

	for _, record := range event.Records {
		// Process your record
		curRecordSequenceNumber = record.Change.SequenceNumber
	}

	if curRecordSequenceNumber != "" {
		batchItemFailures = append(batchItemFailures, BatchItemFailure{ItemIdentifier: curRecordSequenceNumber})
	}
	
	batchResult := BatchResult{
		BatchItemFailures: batchItemFailures,
	}

	return &batchResult, nil
}

func main() {
	lambda.Start(HandleRequest)
}
```

------
#### [ Java ]

**適用於 Java 2.x 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)儲存庫中設定和執行。
使用 Java 搭配 Lambda 報告 DynamoDB 批次項目失敗。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord;

import java.util.ArrayList;
import java.util.List;

public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, StreamsEventResponse> {

    @Override
    public StreamsEventResponse handleRequest(DynamodbEvent input, Context context) {

        List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
        String curRecordSequenceNumber = "";

        for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) {
          try {
                //Process your record
                StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb();
                curRecordSequenceNumber = dynamodbRecord.getSequenceNumber();
                
            } catch (Exception e) {
                /* Since we are working with streams, we can return the failed item immediately.
                   Lambda will immediately begin to retry processing from this failed item onwards. */
                batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
                return new StreamsEventResponse(batchItemFailures);
            }
        }
       
       return new StreamsEventResponse();   
    }
}
```

------
#### [ JavaScript ]

**適用於 JavaScript (v3) 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)儲存庫中設定和執行。
使用 JavaScript 搭配 Lambda 報告 DynamoDB 批次項目失敗。  

```
export const handler = async (event) => {
  const records = event.Records;
  let curRecordSequenceNumber = "";

  for (const record of records) {
    try {
      // Process your record
      curRecordSequenceNumber = record.dynamodb.SequenceNumber;
    } catch (e) {
      // Return failed record's sequence number
      return { batchItemFailures: [{ itemIdentifier: curRecordSequenceNumber }] };
    }
  }

  return { batchItemFailures: [] };
};
```
使用 TypeScript 搭配 Lambda 報告 DynamoDB 批次項目失敗。  

```
import {
  DynamoDBBatchResponse,
  DynamoDBBatchItemFailure,
  DynamoDBStreamEvent,
} from "aws-lambda";

export const handler = async (
  event: DynamoDBStreamEvent
): Promise<DynamoDBBatchResponse> => {
  const batchItemFailures: DynamoDBBatchItemFailure[] = [];
  let curRecordSequenceNumber;

  for (const record of event.Records) {
    curRecordSequenceNumber = record.dynamodb?.SequenceNumber;

    if (curRecordSequenceNumber) {
      batchItemFailures.push({
        itemIdentifier: curRecordSequenceNumber,
      });
    }
  }

  return { batchItemFailures: batchItemFailures };
};
```

------
#### [ PHP ]

**適用於 PHP 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)儲存庫中設定和執行。
使用 PHP 搭配 Lambda 報告 DynamoDB 批次項目失敗。  

```
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler implements StdHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handle(mixed $event, Context $context): array
    {
        $dynamoDbEvent = new DynamoDbEvent($event);
        $this->logger->info("Processing records");

        $records = $dynamoDbEvent->getRecords();
        $failedRecords = [];
        foreach ($records as $record) {
            try {
                $data = $record->getData();
                $this->logger->info(json_encode($data));
                // TODO: Do interesting work based on the new data
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
                // failed processing the record
                $failedRecords[] = $record->getSequenceNumber();
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");

        // change format for the response
        $failures = array_map(
            fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber],
            $failedRecords
        );

        return [
            'batchItemFailures' => $failures
        ];
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**適用於 Python (Boto3) 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)儲存庫中設定和執行。
使用 Python 搭配 Lambda 報告 DynamoDB 批次項目失敗。  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def handler(event, context):
    records = event.get("Records")
    curRecordSequenceNumber = ""
    
    for record in records:
        try:
            # Process your record
            curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"]
        except Exception as e:
            # Return failed record's sequence number
            return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]}

    return {"batchItemFailures":[]}
```

------
#### [ Ruby ]

**SDK for Ruby**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)儲存庫中設定和執行。
使用 Ruby 搭配 Lambda 報告 DynamoDB 批次項目失敗。  

```
def lambda_handler(event:, context:)
    records = event["Records"]
    cur_record_sequence_number = ""
  
    records.each do |record|
      begin
        # Process your record
        cur_record_sequence_number = record["dynamodb"]["SequenceNumber"]
      rescue StandardError => e
        # Return failed record's sequence number
        return {"batchItemFailures" => [{"itemIdentifier" => cur_record_sequence_number}]}
      end
    end
  
    {"batchItemFailures" => []}
  end
```

------
#### [ Rust ]

**適用於 Rust 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)儲存庫中設定和執行。
使用 Rust 搭配 Lambda 報告 DynamoDB 批次項目失敗。  

```
use aws_lambda_events::{
    event::dynamodb::{Event, EventRecord, StreamRecord},
    streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

/// Process the stream record
fn process_record(record: &EventRecord) -> Result<(), Error> {
    let stream_record: &StreamRecord = &record.change;

    // process your stream record here...
    tracing::info!("Data: {:?}", stream_record);

    Ok(())
}

/// Main Lambda handler here...
async fn function_handler(event: LambdaEvent<Event>) -> Result<DynamoDbEventResponse, Error> {
    let mut response = DynamoDbEventResponse {
        batch_item_failures: vec![],
    };

    let records = &event.payload.records;

    if records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(response);
    }

    for record in records {
        tracing::info!("EventId: {}", record.event_id);

        // Couldn't find a sequence number
        if record.change.sequence_number.is_none() {
            response.batch_item_failures.push(DynamoDbBatchItemFailure {
                item_identifier: Some("".to_string()),
            });
            return Ok(response);
        }

        // Process your record here...
        if process_record(record).is_err() {
            response.batch_item_failures.push(DynamoDbBatchItemFailure {
                item_identifier: record.change.sequence_number.clone(),
            });
            /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
            return Ok(response);
        }
    }

    tracing::info!("Successfully processed {} record(s)", records.len());

    Ok(response)
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

## 使用 Powertools 進行 AWS Lambda 批次處理器
<a name="services-ddb-batchfailurereporting-powertools"></a>

Powertools for 的批次處理器公用程式 AWS Lambda 會自動處理部分批次回應邏輯，降低實作批次失敗報告的複雜性。以下是使用批次處理器的範例：

**Python**  
如需完整的範例和設定說明，請參閱 [batch processor documentation](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)。
使用 AWS Lambda 批次處理器處理 DynamoDB 串流記錄。  

```
import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes import DynamoDBStreamEvent
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
logger = Logger()

def record_handler(record):
    logger.info(record)
    # Your business logic here
    # Raise an exception to mark this record as failed
    
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(
        event=event, 
        record_handler=record_handler, 
        processor=processor,
        context=context
    )
```

**TypeScript**  
如需完整的範例和設定說明，請參閱 [batch processor documentation](https://docs.aws.amazon.com/powertools/typescript/latest/features/batch/)。
使用 AWS Lambda 批次處理器處理 DynamoDB 串流記錄。  

```
import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { DynamoDBStreamEvent, Context } from 'aws-lambda';

const processor = new BatchProcessor(EventType.DynamoDBStreams);
const logger = new Logger();

const recordHandler = async (record: any): Promise<void> => {
    logger.info('Processing record', { record });
    // Your business logic here
    // Throw an error to mark this record as failed
};

export const handler = async (event: DynamoDBStreamEvent, context: Context) => {
    return processPartialResponse(event, recordHandler, processor, {
        context,
    });
};
```

**Java**  
如需完整的範例和設定說明，請參閱 [batch processor documentation](https://docs.powertools.aws.dev/lambda/java/latest/utilities/batch/)。
使用 AWS Lambda 批次處理器處理 DynamoDB 串流記錄。  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;

public class DynamoDBStreamBatchHandler implements RequestHandler<DynamodbEvent, StreamsEventResponse> {

    private final BatchMessageHandler<DynamodbEvent, StreamsEventResponse> handler;

    public DynamoDBStreamBatchHandler() {
        handler = new BatchMessageHandlerBuilder()
                .withDynamoDbBatchHandler()
                .buildWithRawMessageHandler(this::processMessage);
    }

    @Override
    public StreamsEventResponse handleRequest(DynamodbEvent ddbEvent, Context context) {
        return handler.processBatch(ddbEvent, context);
    }

    private void processMessage(DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord, Context context) {
        // Process the change record
    }
}
```

**.NET**  
如需完整的範例和設定說明，請參閱 [batch processor documentation](https://docs.aws.amazon.com/powertools/dotnet/utilities/batch-processing/)。
使用 AWS Lambda 批次處理器處理 DynamoDB 串流記錄。  

```
using System;
using System.Threading;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
using Amazon.Lambda.Serialization.SystemTextJson;
using AWS.Lambda.Powertools.BatchProcessing;

[assembly: LambdaSerializer(typeof(DefaultLambdaJsonSerializer))]

namespace HelloWorld;

public class Customer
{
    public string? CustomerId { get; set; }
    public string? Name { get; set; }
    public string? Email { get; set; }
    public DateTime CreatedAt { get; set; }
}

internal class TypedDynamoDbRecordHandler : ITypedRecordHandler<Customer> 
{
    public async Task<RecordHandlerResult> HandleAsync(Customer customer, CancellationToken cancellationToken)
    {
        if (string.IsNullOrEmpty(customer.Email)) 
        {
            throw new ArgumentException("Customer email is required");
        }

        return await Task.FromResult(RecordHandlerResult.None); 
    }
}

public class Function
{
    [BatchProcessor(TypedRecordHandler = typeof(TypedDynamoDbRecordHandler))]
    public BatchItemFailuresResponse HandlerUsingTypedAttribute(DynamoDBEvent _)
    {
        return TypedDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; 
    }
}
```

# 在 Lambda 中保留 DynamoDB 事件來源的捨棄記錄
<a name="services-dynamodb-errors"></a>

DynamoDB 事件來源映射的錯誤處理取決於錯誤是在調用函數之前還是在調用函數期間發生：
+ **調用前：**如果 Lambda 事件來源映射因為限流或其他問題而無法調用函數，它會重試，直到記錄過期或超過事件來源映射上設定的最長存留期 ([MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds))。
+ **在調用期間：**如果調用函數但傳回錯誤，Lambda 會重試，直到記錄過期、超過最長存留期 ([MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds))，或達到設定的重試配額 ([MaximumRetryAttempts](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRetryAttempts))。對於函數錯誤，您也可以設定 [BisectBatchOnFunctionError](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-response-BisectBatchOnFunctionError)，將失敗的批次分割為兩個較小的批次，隔離錯誤的記錄並避免逾時。分割批次不會消耗重試配額。

如果錯誤處理措施失敗，Lambda 會捨棄相應記錄，並繼續處理串流中的批次。使用預設設定時，這表示不良的記錄可能會封鎖受影響碎片上的處理長達一天。若要避免此情況，在設定函數的事件來源映射時，請使用合理的重試次數和符合您使用案例的記錄最大保留期。

## 設定失敗調用的目的地
<a name="dynamodb-on-failure-destination-console"></a>

若要保留失敗的事件來源映射調用記錄，請將目標地新增到函數的事件來源映射中。傳送至該目的地的每筆記錄都是包含失敗調用之中繼資料的 JSON 文件。對於 Amazon S3 目的地，Lambda 還會將整個調用記錄與中繼資料一起傳送。您可以設定任何 Amazon SNS 主題、Amazon SQS 佇列、Amazon S3 儲存貯體或 Kafka 做為目的地。

透過 Amazon S3 目的地，您可以使用 [Amazon S3 事件通知](https://docs.aws.amazon.com/)功能，在物件上傳至您的目的地 S3 儲存貯體時接收通知。您也可以設定 S3 事件通知來調用另一個 Lambda 函數，以對失敗的批次執行自動處理。

執行角色必須具有目的地的許可：
+ **對於 SQS 目的地：**[sqs：SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)
+ **對於 SNS 目的地：**[sns：Publish](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html)
+ **對於 S3 目的地：**[s3：PutObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html) 和 [s3：ListBucket](https://docs.aws.amazon.com/AmazonS3/latest/API/ListObjectsV2.html)
+ **對於 Kafka 目的地：**[kafka-cluster：WriteData](https://docs.aws.amazon.com/msk/latest/developerguide/kafka-actions.html)

您可以將 Kafka 主題設定為 Kafka 事件來源映射的失敗時目的地。當 Lambda 在耗盡重試嘗試後或記錄超過最長存留期後無法處理記錄時，Lambda 會將失敗的記錄傳送至指定的 Kafka 主題，以供稍後處理。請參閱[使用 Kafka 主題做為失敗時的目的地](kafka-on-failure-destination.md)。

如果您已使用自己的 KMS 金鑰為 S3 目的地啟用加密，則函數的執行角色也必須具有呼叫 [kms:GenerateDataKey](https://docs.aws.amazon.com/kms/latest/APIReference/API_GenerateDataKey.html) 的許可。如果 KMS 金鑰和 S3 儲存貯體目的地與 Lambda 函數和執行角色位於不同的帳戶中，請將 KMS 金鑰設定為信任執行角色以允許 kms:GenerateDataKey。

若要使用主控台設定失敗時的目的地，請依照下列步驟執行：

1. 開啟 Lambda 主控台中的 [函數頁面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 選擇一個函數。

1. 在**函數概觀下**，選擇**新增目的地**。

1. 針對**來源**，請選擇**事件來源映射調用**。

1. 對於**事件來源映射**，請選擇針對此函數設定的事件來源。

1. 對於**條件**，選取**失敗時**。對於事件來源映射調用，這是唯一可接受的條件。

1. 對於**目標類型**，請選擇 Lambda 將調用記錄傳送至的目標類型。

1. 對於**目的地**，請選擇一個資源。

1. 選擇**儲存**。

您也可以使用 AWS Command Line Interface () 設定失敗時的目的地AWS CLI。例如，下列 [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) 命令會將具有 SQS 失敗時的目的地的事件來源映射新增至 `MyFunction`：

```
aws lambda create-event-source-mapping \
--function-name "MyFunction" \
--event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
```

下列 [update-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/update-event-source-mapping.html) 命令會更新事件來源映射，以在嘗試兩次重試後，或在記錄超過一小時時，將失敗的調用記錄傳送至 SNS 目的地。

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--maximum-retry-attempts 2 \
--maximum-record-age-in-seconds 3600 \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'
```

系統會以非同步的方式套用更新的設定，在處理完成之前不會反映在輸出中。使用 [get-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/get-event-source-mapping.html) 命令檢視目前的狀態。

若要移除目的地，請提供空白字串作為 `destination-config` 參數的引數：

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--destination-config '{"OnFailure": {"Destination": ""}}'
```

### Amazon S3 目的地的安全最佳實務
<a name="ddb-s3-destination-security"></a>

刪除已設定為目的地的 S3 儲存貯體而不從函數的組態中移除目的地，可能會產生安全風險。如果其他使用者知道目的地儲存貯體的名稱，他們可以在其 AWS 帳戶中重新建立儲存貯體。失敗調用的記錄會被傳送到其儲存貯體，可能公開來自您函數的資料。

**警告**  
為了確保無法將來自函數的調用記錄傳送到另一個 中的 S3 儲存貯體 AWS 帳戶，請將條件新增至函數的執行角色，以限制您帳戶中儲存貯體的`s3:PutObject`許可。

下列範例顯示的 IAM 政策，將函數的 `s3:PutObject` 許可限制為帳戶中的儲存貯體。此政策也為 Lambda 提供了使用 S3 儲存貯體做為目的地所需的 `s3:ListBucket` 許可。

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3BucketResourceAccountWrite",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::*/*",
                "arn:aws:s3:::*"
            ],
            "Condition": {
                "StringEquals": {
                    "s3:ResourceAccount": "111122223333"
                }
            }
        }
    ]
}
```

若要使用 AWS 管理主控台 或 將許可政策新增至函數的執行角色 AWS CLI，請參閱下列程序中的指示：

------
#### [ Console ]

**若要將許可政策新增至函數的執行角色 (主控台)**

1. 開啟 Lambda 主控台中的[函數頁面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 選取您要修改其執行角色的 Lambda 函數。

1. 在**組態**索引標籤中，選擇**許可**。

1. 在**執行角色**索引標籤中，選取函數的**角色名稱**，以開啟角色的 IAM 主控台頁面。

1. 透過下列步驟將許可政策新增至角色：

   1. 在**許可政策**窗格中，選擇**新增許可** ，然後選取**建立內嵌政策**。

   1. 在**政策編輯器**中，選取 **JSON**。

   1. 將您要新增的政策貼入編輯器 (取代現有的 JSON)，然後選擇**下一步**。

   1. 在**政策詳細資訊**下，輸入**政策名稱**。

   1. 選擇**建立政策**。

------
#### [ AWS CLI ]

**若要將許可政策新增至函數的執行角色 (CLI)**

1. 建立具有所需許可的 JSON 政策文件，並將其儲存在本機目錄中。

1. 使用 IAM `put-role-policy` CLI 命令，將許可新增至函數的執行角色。從您儲存 JSON 政策文件的目錄執行下列命令，並將角色名稱、政策名稱和政策文件取代為您自己的值。

   ```
   aws iam put-role-policy \
   --role-name my_lambda_role \
   --policy-name LambdaS3DestinationPolicy \
   --policy-document file://my_policy.json
   ```

------

### Amazon SNS 和 Amazon SQS 調用記錄範例
<a name="kinesis-on-failure-destination-example-sns-sqs"></a>

下列範例顯示了 Lambda 傳送至 DynamoDB 串流的 SQS 或 SNS 目的地的一條調用記錄。

```
{
    "requestContext": {
        "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:13:49.717Z",
    "DDBStreamBatchInfo": {
        "shardId": "shardId-00000001573689847184-864758bb",
        "startSequenceNumber": "800000000003126276362",
        "endSequenceNumber": "800000000003126276362",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z",
        "batchSize": 1,
        "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388"
    }
}
```

您可以使用此資訊來從串流擷取受影響的記錄，以進行疑難排解。實際的記錄不包含在內，因此您必須處理此記錄，並在因過期而遺失之前從資料串流中擷取它們。

### Amazon S3 調用記錄範例
<a name="kinesis-on-failure-destination-example-sns-sqs-s3"></a>

下列範例顯示了 Lambda 傳送至 DynamoDB 串流的 S3 儲存貯體目的地的一條調用記錄。除了上一個 SQS 和 SNS 目的地範例中的所有欄位之外，此 `payload` 欄位還包含原始調用記錄做為逸出 JSON 字串。

```
{
    "requestContext": {
        "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:13:49.717Z",
    "DDBStreamBatchInfo": {
        "shardId": "shardId-00000001573689847184-864758bb",
        "startSequenceNumber": "800000000003126276362",
        "endSequenceNumber": "800000000003126276362",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z",
        "batchSize": 1,
        "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388"
    },
    "payload": "<Whole Event>" // Only available in S3
}
```

包含調用記錄的 S3 物件使用以下命名慣例：

```
aws/lambda/<ESM-UUID>/<shardID>/YYYY/MM/DD/YYYY-MM-DDTHH.MM.SS-<Random UUID>
```

# 在 Lambda 中實作有狀態的 DynamoDB 串流處理
<a name="services-ddb-windows"></a>

Lambda 函數可執行持續串流處理應用程式。串流表示持續在應用程式中流動的無限制資料。若要分析此持續更新輸入中的資訊，您可以使用定義的時段來限制包含的記錄。

輪轉時段是定期開啟和關閉的不同時段。依預設，Lambda 調用是無狀態的，您無法在沒有外部資料庫的情況下，將其用於處理多個持續調用的資料。然而，使用輪轉時段，您可以在不同的調用間維護狀態。此狀態包含之前為目前時段處理之訊息的彙總結果。狀態可以是每個分區最多 1 MB。如果超過該大小，則 Lambda 會提前終止時段。

串流中的每個記錄都屬於一個特定時段。Lambda 至少會處理一次每筆記錄，但不保證每筆記錄只會處理一次。在極少數情況下，例如錯誤處理，某些記錄可能會處理多次。第一次時一律會依序處理記錄。如果多次處理記錄，則可能不會按順序處理。

## 彙總與處理
<a name="streams-tumbling-processing"></a>

調用您的使用者管理函數進行彙總，以及處理該彙總的最終結果。Lambda 會彙總時段中接收的所有記錄。您可以在多個批次中接收這些記錄，各自作為單獨的調用。每次調用會收到一個狀態。因此，當使用輪轉時段時，您的 Lambda 函數回應必須包含 `state` 屬性。如果回應不包含 `state` 屬性，Lambda 會將此視為失敗的調用。為了滿足此條件，您的函數可以返回一個 `TimeWindowEventResponse` 物件，它具有下列 JSON 形狀：

**Example `TimeWindowEventResponse` 值**  

```
{
    "state": {
        "1": 282,
        "2": 715
    },
    "batchItemFailures": []
}
```

**注意**  
對於 Java 函數，我們建議使用 `Map<String, String>` 來表示狀態。

在時段結束時，標記 `isFinalInvokeForWindow` 會設定為 `true` 以指示這是最終狀態，並且可隨時進行處理。處理完成後，時段結束並完成最終調用，然後丟棄該狀態。

在時段結束時，Lambda 會針對彙總結果上的動作使用最終處理。您的最終處理將同步調用。成功調用後，您的函數檢查點序號和串流處理將會繼續。如果調用失敗，則您的 Lambda 函數會暫停進一步處理，直至成功調用。

**Example DynamodbTimeWindowEvent**  

```
{
   "Records":[
      {
         "eventID":"1",
         "eventName":"INSERT",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"111",
            "SizeBytes":26,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"2",
         "eventName":"MODIFY",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"222",
            "SizeBytes":59,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"3",
         "eventName":"REMOVE",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"333",
            "SizeBytes":38,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      }
   ],
    "window": {
        "start": "2020-07-30T17:00:00Z",
        "end": "2020-07-30T17:05:00Z"
    },
    "state": {
        "1": "state1"
    },
    "shardId": "shard123456789",
    "eventSourceARN": "stream-ARN",
    "isFinalInvokeForWindow": false,
    "isWindowTerminatedEarly": false
}
```

## Configuration
<a name="streams-tumbling-config"></a>

您可以在建立或更新事件來源對映時設定輪轉時段。若要設定輪轉時段，請以秒為單位指定時段 ([TumblingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-TumblingWindowInSeconds))。下列 example AWS Command Line Interface (AWS CLI) 命令會建立輪轉時段為 120 秒的串流事件來源映射。針對彙總與處理定義的 Lambda 函數命名為 `tumbling-window-example-function`。

```
aws lambda create-event-source-mapping \
--event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \
--function-name tumbling-window-example-function \
--starting-position TRIM_HORIZON \
--tumbling-window-in-seconds 120
```

Lambda 根據記錄插入串流的時間，確定輪轉時段邊界。所有記錄都有 Lambda 在邊界確定中使用的近似時間戳記。

輪轉時段彙總不支援重新分區。分區結束後，Lambda 會考慮關閉時段，並且子分區會以新的狀態開始自己的時段。

輪轉時段完全支援現有的重試政策 `maxRetryAttempts` 和 `maxRecordAge`。

**Example Handler.py - 彙總與處理**  
下列 Python 函數示範了如何彙總，然後處理您的最終狀態：  

```
def lambda_handler(event, context):
    print('Incoming event: ', event)
    print('Incoming state: ', event['state'])

#Check if this is the end of the window to either aggregate or process.
    if event['isFinalInvokeForWindow']:
        # logic to handle final state of the window
        print('Destination invoke')
    else:
        print('Aggregate invoke')

#Check for early terminations
    if event['isWindowTerminatedEarly']:
        print('Window terminated early')

    #Aggregation logic
    state = event['state']
    for record in event['Records']:
        state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1

    print('Returning state: ', state)
    return {'state': state}
```

# Amazon DynamoDB 事件來源映射的 Lambda 參數
<a name="services-ddb-params"></a>

所有 Lambda 事件來源類型都會共用相同的 [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) 和 [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) API 操作。但是，只有一些參數適用於 DynamoDB Streams。


| 參數 | 必要 | 預設 | 備註 | 
| --- | --- | --- | --- | 
|  BatchSize  |  否  |  100  |  上限：10,000  | 
|  BisectBatchOnFunctionError  |  N  |  false  | 無  | 
|  DestinationConfig  |  N  | N/A  |  捨棄記錄的標準 Amazon SQS 佇列或標準 Amazon SNS 主題目的地  | 
|  已啟用  |  N  |  true  | 無  | 
|  EventSourceArn  |  Y  | N/A |  資料串流或串流消費者的 ARN  | 
|  FilterCriteria  |  N  | N/A  |  [控制 Lambda 將哪些事件傳送至您的函數](invocation-eventfiltering.md)  | 
|  FunctionName  |  是  | N/A  | 無  | 
|  FunctionResponseTypes  |  N  | N/A |  若要讓函數報告批次中的特定失敗，請將值 `ReportBatchItemFailures` 包含在 `FunctionResponseTypes` 中。如需詳細資訊，請參閱[使用 DynamoDB 和 Lambda 設定部分批次回應](services-ddb-batchfailurereporting.md)。  | 
|  MaximumBatchingWindowInSeconds  |  N  |  0  | 無  | 
|  MaximumRecordAgeInSeconds  |  N  |  -1  |  -1 代表無限：系統會重試失敗的記錄，直到記錄過期為止。[DynamoDB Streams 的資料保留限制](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.DataRetention)為 24 小時。 下限：-1 上限：604,800  | 
|  MaximumRetryAttempts  |  N  |  -1  |  -1 代表無限：系統會重試失敗的記錄，直到記錄過期為止 下限：0 上限：10,000  | 
|  ParallelizationFactor  |  N  |  1  |  上限：10  | 
|  StartingPosition  |  Y  | N/A  |  TRIM\$1HORIZON 或 LATEST  | 
|  TumblingWindowInSeconds  |  N  | N/A  |  下限：0 上限：900  | 

# 搭配 DynamoDB 事件來源使用事件篩選
<a name="with-ddb-filtering"></a>

您可以使用事件篩選來控制 Lambda 將哪些記錄從串流或佇列中傳送至函數。如需事件篩選運作方式的一般資訊，請參閱[控制 Lambda 將哪些事件傳送至您的函數](invocation-eventfiltering.md)。

本節重點介紹 DynamoDB 事件來源的事件篩選。

**注意**  
DynamoDB 事件來源映射僅支援依據 `dynamodb` 鍵進行篩選。

**Topics**
+ [DynamoDB 事件](#filtering-ddb)
+ [使用資料表屬性進行篩選](#filtering-ddb-attributes)
+ [使用布林表達式進行篩選](#filtering-ddb-boolean)
+ [使用 Exists 運算子](#filtering-ddb-exists)
+ [DynamoDB 篩選的 JSON 格式](#filtering-ddb-JSON-format)

## DynamoDB 事件
<a name="filtering-ddb"></a>

假設您有一個包含主索引鍵 `CustomerName` 和屬性 `AccountManager` 與 `PaymentTerms` 的 DynamoDB 表格。以下顯示 DynamoDB 表格串流中的範例記錄。

```
{
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
          "ApproximateCreationDateTime": "1678831218.0",
          "Keys": {
              "CustomerName": {
                  "S": "AnyCompany Industries"
              }
          },
          "NewImage": {
              "AccountManager": {
                  "S": "Pat Candella"
              },
              "PaymentTerms": {
                  "S": "60 days"
              },
              "CustomerName": {
                  "S": "AnyCompany Industries"
              }
          },
          "SequenceNumber": "111",
          "SizeBytes": 26,
          "StreamViewType": "NEW_IMAGE"
      }
  }
```

若要根據 DynamoDB 表格中的索引鍵值和屬性值進行篩選，請使用記錄中的 `dynamodb` 索引鍵。下列各節提供不同的篩選條件類型範例。

### 使用資料表索引進行篩選
<a name="filtering-ddb-keys"></a>

假設您希望函數僅處理主索引鍵 `CustomerName` 為 "AnyCompany Industries" 的記錄。`FilterCriteria` 物件如下所示。

```
{
     "Filters": [
          {
              "Pattern": "{ \"dynamodb\" : { \"Keys\" : { \"CustomerName\" : { \"S\" : [ \"AnyCompany Industries\" ] } } } }"
          }
      ]
 }
```

補充說明，此處是篩選條件的 `Pattern` 在純文字 JSON 中擴展的值。

```
{
     "dynamodb": {
          "Keys": {
              "CustomerName": {
                  "S": [ "AnyCompany Industries" ]
                  }
              }
          }
 }
```

您可以使用主控台、AWS CLI 或 AWS SAM 範本新增篩選條件。

------
#### [ Console ]

若要使用主控台新增此篩選條件，請遵循 [將篩選條件標準連接至事件來源映射 (主控台)](invocation-eventfiltering.md#filtering-console) 中的指示，並針對**篩選條件標準**輸入下列字串。

```
{ "dynamodb" : { "Keys" : { "CustomerName" : { "S" : [ "AnyCompany Industries" ] } } } }
```

------
#### [ AWS CLI ]

若要使用 AWS Command Line Interface (AWS CLI) 來建立具有這些篩選條件標準的新事件來源映射，請執行下列命令。

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"Keys\" : { \"CustomerName\" : { \"S\" : [ \"AnyCompany Industries\" ] } } } }"}]}'
```

若要將這些篩選條件標準新增到現有事件來源映射，請執行下列命令。

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"Keys\" : { \"CustomerName\" : { \"S\" : [ \"AnyCompany Industries\" ] } } } }"}]}'
```

------
#### [ AWS SAM ]

若要使用 AWS SAM 新增此篩選條件，請將下列程式碼片段新增到事件來源的 YAML 範本。

```
FilterCriteria:
   Filters:
     - Pattern: '{ "dynamodb" : { "Keys" : { "CustomerName" : { "S" : [ "AnyCompany Industries" ] } } } }'
```

------

## 使用資料表屬性進行篩選
<a name="filtering-ddb-attributes"></a>

使用 DynamoDB 時，您也可以使用 `NewImage` 和 `OldImage` 索引鍵來篩選屬性值。假設您想篩選最新表格映像中的 `AccountManager` 屬性為 "Pat Candella" 或 "Shirley Rodriguez" 的記錄。`FilterCriteria` 物件如下所示。

```
{
    "Filters": [
        {
            "Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\", \"Shirley Rodriguez\" ] } } } }"
        }
    ]
}
```

補充說明，此處是篩選條件的 `Pattern` 在純文字 JSON 中擴展的值。

```
{
    "dynamodb": {
        "NewImage": {
            "AccountManager": {
                "S": [ "Pat Candella", "Shirley Rodriguez" ]
            }
        }
    }
}
```

您可以使用主控台、AWS CLI 或 AWS SAM 範本新增篩選條件。

------
#### [ Console ]

若要使用主控台新增此篩選條件，請遵循 [將篩選條件標準連接至事件來源映射 (主控台)](invocation-eventfiltering.md#filtering-console) 中的指示，並針對**篩選條件標準**輸入下列字串。

```
{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella", "Shirley Rodriguez" ] } } } }
```

------
#### [ AWS CLI ]

若要使用 AWS Command Line Interface (AWS CLI) 來建立具有這些篩選條件標準的新事件來源映射，請執行下列命令。

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\", \"Shirley Rodriguez\" ] } } } }"}]}'
```

若要將這些篩選條件標準新增到現有事件來源映射，請執行下列命令。

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\", \"Shirley Rodriguez\" ] } } } }"}]}'
```

------
#### [ AWS SAM ]

若要使用 AWS SAM 新增此篩選條件，請將下列程式碼片段新增到事件來源的 YAML 範本。

```
FilterCriteria:
  Filters:
    - Pattern: '{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella", "Shirley Rodriguez" ] } } } }'
```

------

## 使用布林表達式進行篩選
<a name="filtering-ddb-boolean"></a>

您也可以使用布林值 AND 運算式建立篩選條件。這些運算式可以同時包含資料表的索引鍵和屬性參數。假設您想篩選記錄，其中 `AccountManager` 的 `NewImage` 值是「Pat Candella」，`OldImage` 值是「Terry Whitlock」。`FilterCriteria` 物件如下所示。

```
{
    "Filters": [
        {
            "Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\" ] } } } , \"dynamodb\" : { \"OldImage\" : { \"AccountManager\" : { \"S\" : [ \"Terry Whitlock\" ] } } } }"
        }
    ]
}
```

補充說明，此處是篩選條件的 `Pattern` 在純文字 JSON 中擴展的值。

```
{ 
    "dynamodb" : { 
        "NewImage" : { 
            "AccountManager" : { 
                "S" : [ 
                    "Pat Candella" 
                ] 
            } 
        } 
    }, 
    "dynamodb": { 
        "OldImage": { 
            "AccountManager": { 
                "S": [ 
                    "Terry Whitlock" 
                ] 
            } 
        } 
    } 
}
```

您可以使用主控台、AWS CLI 或 AWS SAM 範本新增篩選條件。

------
#### [ Console ]

若要使用主控台新增此篩選條件，請遵循 [將篩選條件標準連接至事件來源映射 (主控台)](invocation-eventfiltering.md#filtering-console) 中的指示，並針對**篩選條件標準**輸入下列字串。

```
{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella" ] } } } , "dynamodb" : { "OldImage" : { "AccountManager" : { "S" : [ "Terry Whitlock" ] } } } }
```

------
#### [ AWS CLI ]

若要使用 AWS Command Line Interface (AWS CLI) 來建立具有這些篩選條件標準的新事件來源映射，請執行下列命令。

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\" ] } } } , \"dynamodb\" : { \"OldImage\" : { \"AccountManager\" : { \"S\" : [ \"Terry Whitlock\" ] } } } } "}]}'
```

若要將這些篩選條件標準新增到現有事件來源映射，請執行下列命令。

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\" ] } } } , \"dynamodb\" : { \"OldImage\" : { \"AccountManager\" : { \"S\" : [ \"Terry Whitlock\" ] } } } } "}]}'
```

------
#### [ AWS SAM ]

若要使用 AWS SAM 新增此篩選條件，請將下列程式碼片段新增到事件來源的 YAML 範本。

```
FilterCriteria:
  Filters:
    - Pattern: '{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella" ] } } } , "dynamodb" : { "OldImage" : { "AccountManager" : { "S" : [ "Terry Whitlock" ] } } } }'
```

------

**注意**  
DynamoDB 事件篩選不支援使用數值運算子 (數值等於和數值範圍)。即使資料表中的項目儲存為數字，這些參數也會轉換為 JSON 記錄物件中的字串。

## 使用 Exists 運算子
<a name="filtering-ddb-exists"></a>

由於來自 DynamoDB 的 JSON 事件物件的構造方式，使用 Exists 運算子需要特別注意。Exists 運算子只能在事件 JSON 的分葉節點上運作，因此如果您的篩選模式使用 Exists 來測試中繼節點，則無法運作。請考慮下列 DynamoDB 資料表項目。

```
{
  "UserID": {"S": "12345"},
  "Name": {"S": "John Doe"},
  "Organizations": {"L": [
      {"S":"Sales"},
      {"S":"Marketing"},
      {"S":"Support"}
    ]
  }
}
```

您可能想要建立如下所示的篩選條件模式，以測試包含 `"Organizations"` 的事件：

```
{ "dynamodb" : { "NewImage" : { "Organizations" : [ { "exists": true } ] } } }
```

不過，此篩選條件模式永遠不會傳回相符項目，因為 `"Organizations"` 不是分葉節點。下列範例顯示了如何正確使用 Exists 運算子來建構所需的篩選條件模式：

```
{ "dynamodb" : { "NewImage" : {"Organizations": {"L": {"S": [ {"exists": true } ] } } } } }
```

## DynamoDB 篩選的 JSON 格式
<a name="filtering-ddb-JSON-format"></a>

若要正確篩選來自 DynamoDB 來源的事件，資料欄位和資料欄位 (`dynamodb`) 的篩選條件標準都必須是有效的 JSON 格式。如果其中一個欄位不是有效的 JSON 格式，則 Lambda 會捨棄訊息或擲回例外狀況。下表摘要說明特定行為：


| 傳入資料格式 | 資料屬性的篩選條件模式格式 | 產生的動作 | 
| --- | --- | --- | 
|  有效的 JSON  |  有效的 JSON  |  根據您的篩選條件標準之 Lambda 篩選條件。  | 
|  有效的 JSON  |  資料屬性沒有篩選條件模式  |  Lambda 篩選條件 (僅限其他中繼資料屬性) 會根據您的篩選條件標準而定。  | 
|  有效的 JSON  |  非 JSON  |  Lambda 會在事件來源映射建立或更新時擲回例外狀況。資料屬性的篩選條件模式必須是有效的 JSON 格式。  | 
|  非 JSON  |  有效的 JSON  |  Lambda 捨棄記錄。  | 
|  非 JSON  |  資料屬性沒有篩選條件模式  |  Lambda 篩選條件 (僅限其他中繼資料屬性) 會根據您的篩選條件標準而定。  | 
|  非 JSON  |  非 JSON  |  Lambda 會在事件來源映射建立或更新時擲回例外狀況。資料屬性的篩選條件模式必須是有效的 JSON 格式。  | 

# 教學課程： AWS Lambda 搭配 Amazon DynamoDB 串流使用
<a name="with-ddb-example"></a>

 在此教學課程中，您建立 Lambda 函數以從 Amazon DynamoDB Streams 中取用事件。

## 先決條件
<a name="with-ddb-prepare"></a>

### 安裝 AWS Command Line Interface
<a name="install_aws_cli"></a>

如果您尚未安裝 AWS Command Line Interface，請依照[安裝或更新最新版本的 AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)中的步驟進行安裝。

本教學課程需使用命令列終端機或 Shell 來執行命令。在 Linux 和 macOS 中，使用您偏好的 Shell 和套件管理工具。

**注意**  
在 Windows 中，作業系統的內建終端不支援您常與 Lambda 搭配使用的某些 Bash CLI 命令 (例如 `zip`)。若要取得 Ubuntu 和 Bash 的 Windows 整合版本，請[安裝適用於 Linux 的 Windows 子系統](https://docs.microsoft.com/en-us/windows/wsl/install-win10)。

## 建立執行角色
<a name="with-ddb-create-execution-role"></a>

建立 [執行角色](lambda-intro-execution-role.md)，授予函數存取 AWS 資源的許可。

**若要建立執行角色**

1. 在 IAM 主控台中開啟[角色頁面](https://console.aws.amazon.com/iam/home#/roles)。

1. 選擇建**立角色**。

1. 建立具備下列屬性的角色。
   + **信任實體** - Lambda。
   + **許可** - **AWSLambdaDynamoDBExecutionRole**。
   + **角色名稱** - **lambda-dynamodb-role**。

**AWSLambdaDynamoDBExecutionRole** 具備函數自 DynamoDB 讀取項目以及寫入日誌到 CloudWatch Logs 時所需的許可。

## 建立函數
<a name="with-ddb-example-create-function"></a>

建立 Lambda 函數來處理 DynamoDB 事件。函數程式碼將一些傳入的事件資料寫入 CloudWatch Logs。

------
#### [ .NET ]

**適用於 .NET 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)儲存庫中設定和執行。
使用 .NET 搭配 Lambda 來使用 DynamoDB 事件。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace AWSLambda_DDB;

public class Function
{
    public void FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
    {
        context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");

        foreach (var record in dynamoEvent.Records)
        {
            context.Logger.LogInformation($"Event ID: {record.EventID}");
            context.Logger.LogInformation($"Event Name: {record.EventName}");

            context.Logger.LogInformation(JsonSerializer.Serialize(record));
        }

        context.Logger.LogInformation("Stream processing complete.");
    }
}
```

------
#### [ Go ]

**SDK for Go V2**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)儲存庫中設定和執行。
使用 Go 搭配 Lambda 來使用 DynamoDB 事件。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-lambda-go/events"
	"fmt"
)

func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*string, error) {
	if len(event.Records) == 0 {
		return nil, fmt.Errorf("received empty event")
	}

	for _, record := range event.Records {
	 	LogDynamoDBRecord(record)
	}

	message := fmt.Sprintf("Records processed: %d", len(event.Records))
	return &message, nil
}

func main() {
	lambda.Start(HandleRequest)
}

func LogDynamoDBRecord(record events.DynamoDBEventRecord){
	fmt.Println(record.EventID)
	fmt.Println(record.EventName)
	fmt.Printf("%+v\n", record.Change)
}
```

------
#### [ Java ]

**適用於 Java 2.x 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)儲存庫中設定和執行。
使用 Java 搭配 Lambda 來使用 DynamoDB 事件。  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

public class example implements RequestHandler<DynamodbEvent, Void> {

    private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create();

    @Override
    public Void handleRequest(DynamodbEvent event, Context context) {
        System.out.println(GSON.toJson(event));
        event.getRecords().forEach(this::logDynamoDBRecord);
        return null;
    }

    private void logDynamoDBRecord(DynamodbStreamRecord record) {
        System.out.println(record.getEventID());
        System.out.println(record.getEventName());
        System.out.println("DynamoDB Record: " + GSON.toJson(record.getDynamodb()));
    }
}
```

------
#### [ JavaScript ]

**適用於 JavaScript (v3) 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)儲存庫中設定和執行。
使用 JavaScript 搭配 Lambda 來使用 DynamoDB 事件。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(record => {
        logDynamoDBRecord(record);
    });
};

const logDynamoDBRecord = (record) => {
    console.log(record.eventID);
    console.log(record.eventName);
    console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
```
使用 TypeScript 搭配 Lambda 來使用 DynamoDB 事件。  

```
export const handler = async (event, context) => {
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(record => {
        logDynamoDBRecord(record);
    });
}
const logDynamoDBRecord = (record) => {
    console.log(record.eventID);
    console.log(record.eventName);
    console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
```

------
#### [ PHP ]

**適用於 PHP 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)儲存庫中設定和執行。
使用 PHP 搭配 Lambda 來使用 DynamoDB 事件。  

```
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\DynamoDb\DynamoDbHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler extends DynamoDbHandler
{
    private StderrLogger $logger;

    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handleDynamoDb(DynamoDbEvent $event, Context $context): void
    {
        $this->logger->info("Processing DynamoDb table items");
        $records = $event->getRecords();

        foreach ($records as $record) {
            $eventName = $record->getEventName();
            $keys = $record->getKeys();
            $old = $record->getOldImage();
            $new = $record->getNewImage();
            
            $this->logger->info("Event Name:".$eventName."\n");
            $this->logger->info("Keys:". json_encode($keys)."\n");
            $this->logger->info("Old Image:". json_encode($old)."\n");
            $this->logger->info("New Image:". json_encode($new));
            
            // TODO: Do interesting work based on the new data

            // Any exception thrown will be logged and the invocation will be marked as failed
        }

        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords items");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**適用於 Python (Boto3) 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)儲存庫中設定和執行。
使用 Python 搭配 Lambda 來使用 DynamoDB 事件。  

```
import json

def lambda_handler(event, context):
    print(json.dumps(event, indent=2))

    for record in event['Records']:
        log_dynamodb_record(record)

def log_dynamodb_record(record):
    print(record['eventID'])
    print(record['eventName'])
    print(f"DynamoDB Record: {json.dumps(record['dynamodb'])}")
```

------
#### [ Ruby ]

**SDK for Ruby**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)儲存庫中設定和執行。
使用 Ruby 搭配 Lambda 來使用 DynamoDB 事件。  

```
def lambda_handler(event:, context:)
    return 'received empty event' if event['Records'].empty?
  
    event['Records'].each do |record|
      log_dynamodb_record(record)
    end
  
    "Records processed: #{event['Records'].length}"
  end
  
  def log_dynamodb_record(record)
    puts record['eventID']
    puts record['eventName']
    puts "DynamoDB Record: #{JSON.generate(record['dynamodb'])}"
  end
```

------
#### [ Rust ]

**適用於 Rust 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)儲存庫中設定和執行。
使用 Rust 搭配 Lambda 來使用 DynamoDB 事件。  

```
use lambda_runtime::{service_fn, tracing, Error, LambdaEvent};
use aws_lambda_events::{
    event::dynamodb::{Event, EventRecord},
   };


// Built with the following dependencies:
//lambda_runtime = "0.11.1"
//serde_json = "1.0"
//tokio = { version = "1", features = ["macros"] }
//tracing = { version = "0.1", features = ["log"] }
//tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }
//aws_lambda_events = "0.15.0"

async fn function_handler(event: LambdaEvent<Event>) ->Result<(), Error> {
    
    let records = &event.payload.records;
    tracing::info!("event payload: {:?}",records);
    if records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(());
    }

    for record in records{
        log_dynamo_dbrecord(record);
    }

    tracing::info!("Dynamo db records processed");

    // Prepare the response
    Ok(())

}

fn log_dynamo_dbrecord(record: &EventRecord)-> Result<(), Error>{
    tracing::info!("EventId: {}", record.event_id);
    tracing::info!("EventName: {}", record.event_name);
    tracing::info!("DynamoDB Record: {:?}", record.change );
    Ok(())

}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
    .with_max_level(tracing::Level::INFO)
    .with_target(false)
    .without_time()
    .init();

    let func = service_fn(function_handler);
    lambda_runtime::run(func).await?;
    Ok(())
    
}
```

------

**建立函數**

1. 將範本程式碼複製到名為 `example.js` 的檔案。

1. 建立部署套件。

   ```
   zip function.zip example.js
   ```

1. 使用 `create-function` 命令建立一個 Lambda 函數。

   ```
   aws lambda create-function --function-name ProcessDynamoDBRecords \
       --zip-file fileb://function.zip --handler example.handler --runtime nodejs24.x \
       --role arn:aws:iam::111122223333:role/lambda-dynamodb-role
   ```

## 測試 Lambda 函數
<a name="with-dbb-invoke-manually"></a>

在此步驟中，您會使用 `invoke` AWS Lambda CLI 命令和下列範例 DynamoDB 事件手動叫用 Lambda 函數。將下列內容複製到名為 `input.txt` 的檔案。

**Example input.txt**  

```
{
   "Records":[
      {
         "eventID":"1",
         "eventName":"INSERT",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"111",
            "SizeBytes":26,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"2",
         "eventName":"MODIFY",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"222",
            "SizeBytes":59,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"3",
         "eventName":"REMOVE",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"333",
            "SizeBytes":38,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      }
   ]
}
```

執行以下 `invoke` 命令。

```
aws lambda invoke --function-name ProcessDynamoDBRecords \
    --cli-binary-format raw-in-base64-out \
    --payload file://input.txt outputfile.txt
```

如果您使用的是第 2 AWS CLI 版，則需要 **cli-binary-format**選項。若要讓此成為預設的設定，請執行 `aws configure set cli-binary-format raw-in-base64-out`。若要取得更多資訊，請參閱*《AWS Command Line Interface 使用者指南第 2 版》*中 [AWS CLI 支援的全域命令列選項](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-options.html#cli-configure-options-list)。

該函數會在回應本文中傳回字串 `message` 訊息。

在 `outputfile.txt` 檔案中確認輸出。

## 建立啟用串流的 DynamoDB 資料表
<a name="with-ddb-create-buckets"></a>

建立啟用串流的 Amazon DynamoDB 資料表。

**建立 DynamoDB 資料表**

1. 開啟 [DynamoDB 主控台](https://console.aws.amazon.com/dynamodb)。

1. 選擇 **Create Table** (建立資料表)。

1. 根據下列設定建立資料表。
   + **Table name (資料表名稱)** – **lambda-dynamodb-stream** 
   + **Primary key** (主要金鑰) - **id** (字串)

1. 選擇**建立**。

**啟用串流**

1. 開啟 [DynamoDB 主控台](https://console.aws.amazon.com/dynamodb)。

1. 選擇 **資料表**。

1. 選擇 **lambda-dynamodb-stream** 資料表。

1. 在 **匯出與串流** 下，選擇 **DynamoDB 串流詳細資訊** 。

1. 選擇 **Turn on (開啟)**。

1. 對於**檢視類型**，選擇**僅索引鍵屬性**。

1. 選擇**開啟串流**。

寫下串流 ARN。在下個步驟將串流與您的 Lambda 函數相關聯時會需要用到它。如需啟用串流的詳細資訊，請參閱[使用 DynamoDB Streams 擷取資料表活動](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html)。

## 在 中新增事件來源 AWS Lambda
<a name="with-ddb-attach-notification-configuration"></a>

在 AWS Lambda中建立事件來源映射。事件來源映射可將 DynamoDB 串流與您的 Lambda 函數相關聯。建立此事件來源映射後， 會 AWS Lambda 開始輪詢串流。

執行下列 AWS CLI `create-event-source-mapping` 命令。命令執行後，請記下 UUID。使用任何命令時，您會需要此 UUID 以參考到事件來源映射，例如當刪除事件來源映射的時候。

```
aws lambda create-event-source-mapping --function-name ProcessDynamoDBRecords \
    --batch-size 100 --starting-position LATEST --event-source DynamoDB-stream-arn
```

 這會在指定的 DynamoDB 串流和 Lambda 函數之間建立映射。您可以將 DynamoDB 串流與多個 Lambda 函數相關聯，也可以將相同的 Lambda 函數與多個串流相關聯。但是 Lambda 函數會共用其所共有之串流的讀取傳送量。

您可以執行下列命令來取得事件來源映射的清單。

```
aws lambda list-event-source-mappings
```

該清單傳回所有您建立的事件來源映射，並針對每個映射顯示 `LastProcessingResult` 和其他內容。此欄位可用在發生問題時，提供資訊豐富的訊息。例如 `No records processed`（表示 AWS Lambda 尚未開始輪詢或串流中沒有記錄） 和 `OK`（表示 AWS Lambda 成功從串流讀取記錄並叫用您的 Lambda 函數） 的值表示沒有問題。如果發生問題，您會收到錯誤訊息。

如果您有許多事件來源映射，請使用函數式稱參數來縮小結果。

```
aws lambda list-event-source-mappings --function-name ProcessDynamoDBRecords
```

## 測試設定
<a name="with-ddb-final-integration-test-no-iam"></a>

測試端對端的體驗。當您更新資料表時，DynamoDB 會將事件記錄寫入串流。當 AWS Lambda 輪詢串流時，若偵測到串流上的新記錄，便會透過傳送事件到函數來代表您調用 Lambda 函數。

1. 在 DynamoDB 主控台上針對資料表新增、更新、刪除項目。DynamoDB 會將這些動作的記錄寫入串流。

1. AWS Lambda 會輪詢串流，並在偵測到串流的更新時，透過傳入在串流中找到的事件資料來叫用 Lambda 函數。

1. 您的函數會執行並在 Amazon CloudWatch 中建立日誌。您可以驗證在 Amazon CloudWatch 主控台中報告的日誌。

## 後續步驟
<a name="with-ddb-next-steps"></a>

此教學課程介紹了使用 Lambda 處理 DynamoDB 串流事件的基礎知識。對於生產工作負載，建議實作部分批次回應邏輯，更有效率地處理個別記錄失敗。適用於 的 Powertools 批次[處理器公用程式](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/) AWS Lambda 可在 Python、TypeScript、.NET 和 Java 中使用，並為此提供強大的解決方案，可自動處理部分批次回應的複雜性，並減少成功處理記錄的重試次數。

## 清除您的資源
<a name="cleanup"></a>

除非您想要保留為此教學課程建立的資源，否則您現在便可刪除。透過刪除您不再使用 AWS 的資源，您可以避免不必要的 費用 AWS 帳戶。

**若要刪除 Lambda 函數**

1. 開啟 Lambda 主控台中的 [函數頁面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 選擇您建立的函數。

1. 選擇 **Actions** (動作)、**Delete** (刪除)。

1. 在文字輸入欄位中輸入 **confirm**，然後選擇**刪除**。

**刪除執行角色**

1. 開啟 IAM 主控台中的 [角色頁面](https://console.aws.amazon.com/iam/home#/roles) 。

1. 選取您建立的執行角色。

1. 選擇**刪除**。

1. 在文字輸入欄位中輸入角色的名稱，然後選擇 **刪除** 。

**若要刪除 DynamoDB 資料表**

1. 開啟 DynamoDB 主控台的 [資料表頁面](https://console.aws.amazon.com//dynamodb/home#tables:) 。

1. 選取您建立的資料表。

1. 選擇 **刪除** 。

1. 在文字方塊中輸入 **delete**。

1. 選擇 **刪除資料表** 。