

# 将 AWS Lambda 与 Amazon DynamoDB 结合使用
<a name="with-ddb"></a>

**注意**  
如果想要将数据发送到 Lambda 函数以外的目标，或要在发送数据之前丰富数据，请参阅 [Amazon EventBridge Pipes](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html)（Amazon EventBridge 管道）。

您可以使用 AWS Lambda 函数来处理 [Amazon DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html)中的记录。使用 DynamoDB Streams，每次更新 DynamoDB 表时，您都可以触发 Lambda 函数以执行其他工作。

在处理 DynamoDB 流时，您需要实施部分批处理响应逻辑，以防止在批处理中的某些记录失败时重试成功处理的记录。Powertools for AWS Lambda 的[批处理器实用程序](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)适用于 Python、TypeScript、.NET 和 Java，并通过自动处理部分批处理响应逻辑简化了此实现，从而减少了开发时间并提高了可靠性。

**Topics**
+ [轮询和批处理流](#dynamodb-polling-and-batching)
+ [轮询和流的起始位置](#dyanmo-db-stream-poll)
+ [DynamoDB Streams 中的分片同时读取器](#events-dynamodb-simultaneous-readers)
+ [事件示例](#events-sample-dynamodb)
+ [使用 Lambda 处理 DynamoDB 记录](services-dynamodb-eventsourcemapping.md)
+ [使用 DynamoDB 和 Lambda 配置部分批处理响应](services-ddb-batchfailurereporting.md)
+ [在 Lambda 中保留 DynamoDB 事件源的丢弃记录](services-dynamodb-errors.md)
+ [在 Lambda 中实现有状态的 DynamoDB 流处理](services-ddb-windows.md)
+ [Amazon DynamoDB 事件源映射的 Lambda 参数](services-ddb-params.md)
+ [对 DynamoDB 事件源使用事件筛选](with-ddb-filtering.md)
+ [教程：将 AWS Lambda 与 Amazon DynamoDB Streams 结合使用](with-ddb-example.md)

## 轮询和批处理流
<a name="dynamodb-polling-and-batching"></a>

Lambda 以每秒 4 次的基本频率轮询 DynamoDB 流中的分区来获取记录。如果记录可用，Lambda 会调用函数并等待结果。如果处理成功，Lambda 会恢复轮询，直到其收到更多记录。

默认情况下，Lambda 会在记录可用时尽快调用您的函数。如果 Lambda 从事件源中读取的批处理只有一条记录，则 Lambda 将会只向该函数发送一条记录。为避免在记录数量较少的情况下调用该函数，您可以配置 *batching window*（批处理时段），让事件源缓冲最多五分钟的记录。调用函数前，Lambda 会持续从事件源中读取记录，直到收集完整批处理、批处理时段到期或批处理达到 6MB 的有效负载时为止。有关更多信息，请参阅 [批处理行为](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)。

**警告**  
Lambda 事件源映射至少处理每个事件一次，有可能出现重复处理记录的情况。为避免与重复事件相关的潜在问题，我们强烈建议您将函数代码设为幂等性。要了解更多信息，请参阅 AWS 知识中心的[如何让我的 Lambda 函数保持幂等性](https://repost.aws/knowledge-center/lambda-function-idempotent)。

Lambda 在发送下次批处理之前不会等待任何配置的[扩展](lambda-extensions.md)完成。换句话说，扩展可能会在 Lambda 处理下一批记录时继续运行。如果您违反了账户的任何[并发](lambda-concurrency.md)设置或限制，可能会导致节流问题。要检测这是否是潜在问题，请监控函数并检查所显示的[并发指标](monitoring-concurrency.md#general-concurrency-metrics)是否高于事件源映射的预期。由于调用间隔时间较短，Lambda 可能会短暂报告高于分片数量的并发使用量。即使对于没有扩展名的 Lambda 函数也是如此。

配置 [ParallelizationFactor](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-ParallelizationFactor) 设置以同时使用多个 Lambda 调用处理 DynamoDB 流的一个分片。您可以指定 Lambda 通过从 1（默认值）到 10 的并行化因子从分区中轮询的并发批次数。例如，假设您将 `ParallelizationFactor` 设置为 2，则最多可以有 200 个并发 Lambda 调用来处理 100 个 DynamoDB 流分片（但您可能实际上会看到不同的 `ConcurrentExecutions` 指标值）。这有助于在数据量不稳定并且 [IteratorAge](monitoring-metrics-types.md#performance-metrics) 较高时纵向扩展处理吞吐量。增加每个分片的并发批次数后，Lambda 仍然可以确保项目（分区和排序键）级别的顺序处理。

## 轮询和流的起始位置
<a name="dyanmo-db-stream-poll"></a>

请注意，事件源映射创建和更新期间的流轮询最终是一致的。
+ 在事件源映射创建期间，可能需要几分钟才能开始轮询来自流的事件。
+ 在事件源映射更新期间，可能需要几分钟才能停止和重新开始轮询来自流的事件。

此行为意味着，如果你指定 `LATEST` 作为流的起始位置，事件源映射可能会在创建或更新期间错过事件。为确保不会错过任何事件，请将流的起始位置指定为 `TRIM_HORIZON`。

## DynamoDB Streams 中的分片同时读取器
<a name="events-dynamodb-simultaneous-readers"></a>

对于作为非全局表的单区域表，您可以设计最多两个 Lambda 函数来同时从同一个 DynamoDB Streams 分片读取数据。超过此限制会导致请求被拒。对于全局表，我们建议您将并行函数的数量限制为一个，以避免请求节流。

## 事件示例
<a name="events-sample-dynamodb"></a>

**Example**  

```
{
  "Records": [
    {
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "NewImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES",
        "SequenceNumber": "111",
        "SizeBytes": 26
      },
      "awsRegion": "us-west-2",
      "eventName": "INSERT",
      "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525",
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "2",
      "eventVersion": "1.0",
      "dynamodb": {
        "OldImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "SequenceNumber": "222",
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "SizeBytes": 59,
        "NewImage": {
          "Message": {
            "S": "This item has changed"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "awsRegion": "us-west-2",
      "eventName": "MODIFY",
      "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525",
      "eventSource": "aws:dynamodb"
    }
  ]}
```

# 使用 Lambda 处理 DynamoDB 记录
<a name="services-dynamodb-eventsourcemapping"></a>

创建事件源映射以指示 Lambda 将流中的记录发送到 Lambda 函数。您可以创建多个事件源映射，以使用多个 Lambda 函数处理相同的数据，或使用单个函数处理来自多个流的项目。

您可以配置事件源映射来处理来自不同 AWS 账户中的流的记录。要了解更多信息，请参阅[创建跨账户事件源映射](#services-dynamodb-eventsourcemapping-cross-account)。

要将函数配置为从 DynamoDB Streams 中读取，请将 [AWSLambdaDynamoDBExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaDynamoDBExecutionRole.html) AWS 托管策略附加到执行角色，然后创建 **DynamoDB** 触发器。

**要添加权限并创建触发器**

1. 打开 Lamba 控制台的[函数](https://console.aws.amazon.com/lambda/home#/functions)页面。

1. 选择一个函数的名称。

1. 选择 **Configuration**（配置）选项卡，然后选择 **Permissions**（权限）。

1. 在**角色名称**下，选择至执行角色的链接。此角色将在 IAM 控制台中打开角色。  
![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/execution-role.png)

1. 选择**添加权限**，然后选择**附加策略**。  
![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/attach-policies.png)

1. 在搜索字段中输入 `AWSLambdaDynamoDBExecutionRole`。向执行角色添加此策略。这是一项 AWS 托管策略，其中包含您的函数从 DynamoDB 流中读取所需的权限。有关此策略的更多信息，请参阅《AWS 托管式策略参考》**中的 [AWSLambdaDynamoDBExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaDynamoDBExecutionRole.html)。

1. 在 Lambda 控制台中返回您的函数。在 **Function overview**（函数概览）下，选择 **Add trigger**（添加触发器）。  
![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/add-trigger.png)

1. 选择触发器类型。

1. 配置必填选项，然后选择 **Add**（添加）。

Lambda 支持 DynamoDB 事件源的以下选项：

**事件源选项**
+ **DynamoDB table**（DynamoDB 表）– 要从中读取记录的 DynamoDB 表。
+ **Batch size**（批处理大小）– 每个批次中要发送到函数的记录数，最高为 10000。Lambda 通过单个调用将批处理中的所有记录传递给函数，前提是事件的总大小未超出同步调用的[有效载荷限制](gettingstarted-limits.md)（6 MB）。
+ **Batch window**（批处理时段）– 指定在调用函数之前收集记录的最长时间（以秒为单位）。
+ **Starting position**（开始位置）– 仅处理新记录或所有现有记录。
  + **Latest**（最新）– 处理已添加到流中的新记录。
  + **Trim horizon**（时间范围）– 处理流中的所有记录。

  在处理任何现有记录后，函数将继续处理新记录。
+ **失败时的目标** – 无法处理的记录的标准 SQS 队列或标准 SNS 主题。当 Lambda 因为某批记录太旧或已用尽所有重试而将其丢弃时，Lambda 会将有关该批处理的详细信息发送到该队列或主题。
+ **Retry attempts**（重试次数）– 函数返回错误时 Lambda 重试的最大次数。这不适用于批处理未到达函数的服务错误或限制。
+ **Maximum age of record**（记录的最长时限）– Lambda 发送到您的函数的记录的最长期限。
+ **Split batch on error**（出现错误时拆分批）– 当函数返回错误时，在重试之前将批次拆分为两批。原始批量大小设置会保持不变。
+ **Concurrent batches per shard**（每个分片的并发批处理数）– 同时处理来自同一个分片的多个批处理。
+ **Enabled**（已启用）– 设置为 true 可启用事件源映射。设置为 false 可停止处理记录。Lambda 会跟踪已处理的最后一条记录，并在重新启用映射后从停止位置重新开始处理。

**注意**  
对于 Lambda 作为 DynamoDB 触发器的一部分而调用的 GetRecords API 调用，您无需付费。

之后，要管理事件源配置，请在设计器中选择触发器。

## 创建跨账户事件源映射
<a name="services-dynamodb-eventsourcemapping-cross-account"></a>

Amazon DynamoDB 现在支持[基于资源的策略](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/access-control-resource-based.html)。利用此功能，您可以使用另一个账户中的 Lambda 函数处理来自一个 AWS 账户的 DynamoDB 流中的数据。

要使用其他 AWS 账户中的 DynamoDB 流为您的 Lambda 函数创建事件源映射，您必须使用基于资源的策略配置该流，以向您的 Lambda 函数授予读取相关记录的权限。要了解如何配置流以实现跨账户访问，请参阅《Amazon DynamoDB 开发人员指南》**中的[与跨账户 Lambda 函数共享访问权限](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/rbac-cross-account-access.html#rbac-analyze-cross-account-lambda-access)。

使用基于资源的策略配置流以向 Lambda 函数授予所需权限后，请使用跨账户流 ARN 创建事件源映射。您可以在跨账户 DynamoDB 控制台中表的**导出和流**选项卡下找到流 ARN。

使用 Lambda 控制台时，请将流 ARN 直接粘贴到事件源映射创建页面的 DynamoDB 表输入字段中。

 **注意：**不支持跨区域触发器。

# 使用 DynamoDB 和 Lambda 配置部分批处理响应
<a name="services-ddb-batchfailurereporting"></a>

在使用和处理来自事件源的流式数据时，默认情况下，Lambda 仅在批处理完全成功时，才会在批次的最高序列号处设置检查点。Lambda 会将所有其他结果视为完全失败并重试批处理，直至达到重试次数上限。要允许在处理来自流的批次时部分成功，请开启 `ReportBatchItemFailures`。允许部分成功有助于减少对记录重试的次数，尽管这并不能完全阻止在成功记录中重试的可能性。

要开启 `ReportBatchItemFailures`，请在 [FunctionResponseTypes](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionResponseTypes) 列表中包含枚举值 **ReportBatchItemFailures**。此列表指示为函数启用了哪些响应类型。您可以在[创建](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html)或[更新](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html)事件源映射时配置此列表。

**注意**  
即使函数代码返回部分批处理失败响应，除非为事件源映射显式启用 `ReportBatchItemFailures` 功能，否则 Lambda 也不会处理这些响应。

## 报告语法
<a name="streams-batchfailurereporting-syntax"></a>

配置批处理项目失败的报告时，将返回 `StreamsEventResponse` 类，其中包含批处理项目失败列表。您可以使用 `StreamsEventResponse` 对象返回批处理中第一个失败记录的序列号。您还可以使用正确的响应语法来创建自己的自定义类。以下 JSON 结构显示了所需的响应语法：

```
{ 
  "batchItemFailures": [ 
        {
            "itemIdentifier": "<SequenceNumber>"
        }
    ]
}
```

**注意**  
如果 `batchItemFailures` 数组包含多个项目，Lambda 会使用序列号最小的记录作为检查点。然后，Lambda 会重试从该检查点开始的所有记录。

## 成功和失败的条件
<a name="streams-batchfailurereporting-conditions"></a>

如果返回以下任意一项，则 Lambda 会将批处理视为完全成功：
+ 空的 `batchItemFailure` 列表
+ Null `batchItemFailure` 列表
+ 空的 `EventResponse`
+ Null `EventResponse`

如果返回以下任何一项，则 Lambda 会将批处理视为完全失败：
+ 空字符串 `itemIdentifier`
+ Null `itemIdentifier`
+ 包含错误密钥名的 `itemIdentifier`

Lambda 会根据您的重试策略在失败时重试。

## 将批次一分为二
<a name="streams-batchfailurereporting-bisect"></a>

如果调用失败并且已开启 `BisectBatchOnFunctionError`，则无论您的 `ReportBatchItemFailures` 设置如何，批次都将一分为二。

当收到批处理部分成功响应且同时开启 `BisectBatchOnFunctionError` 和 `ReportBatchItemFailures` 时，批次将在返回的序列号处一分为二，并且 Lambda 将仅重试剩余记录。

为了简化部分批处理响应逻辑的实现，请考虑使用 Powertools for AWS Lambda 中的[批处理器实用程序](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)，它可以自动为您处理这些复杂问题。

以下函数代码示例将返回批处理中处理失败消息的 ID 列表：

------
#### [ .NET ]

**适用于 .NET 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)存储库中查找完整示例，并了解如何进行设置和运行。
报告使用 .NET 通过 Lambda 进行 DynamoDB 批处理项目失败。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace AWSLambda_DDB;

public class Function
{
    public StreamsEventResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)

    {
        context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");
        List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new List<StreamsEventResponse.BatchItemFailure>();
        StreamsEventResponse streamsEventResponse = new StreamsEventResponse();

        foreach (var record in dynamoEvent.Records)
        {
            try
            {
                var sequenceNumber = record.Dynamodb.SequenceNumber;
                context.Logger.LogInformation(sequenceNumber);
            }
            catch (Exception ex)
            {
                context.Logger.LogError(ex.Message);
                batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = record.Dynamodb.SequenceNumber });
            }
        }

        if (batchItemFailures.Count > 0)
        {
            streamsEventResponse.BatchItemFailures = batchItemFailures;
        }

        context.Logger.LogInformation("Stream processing complete.");
        return streamsEventResponse;
    }
}
```

------
#### [ Go ]

**适用于 Go 的 SDK V2**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)存储库中查找完整示例，并了解如何进行设置和运行。
报告使用 Go 通过 Lambda 进行 DynamoDB 批处理项目失败。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

type BatchItemFailure struct {
	ItemIdentifier string `json:"ItemIdentifier"`
}

type BatchResult struct {
	BatchItemFailures []BatchItemFailure `json:"BatchItemFailures"`
}

func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*BatchResult, error) {
	var batchItemFailures []BatchItemFailure
	curRecordSequenceNumber := ""

	for _, record := range event.Records {
		// Process your record
		curRecordSequenceNumber = record.Change.SequenceNumber
	}

	if curRecordSequenceNumber != "" {
		batchItemFailures = append(batchItemFailures, BatchItemFailure{ItemIdentifier: curRecordSequenceNumber})
	}
	
	batchResult := BatchResult{
		BatchItemFailures: batchItemFailures,
	}

	return &batchResult, nil
}

func main() {
	lambda.Start(HandleRequest)
}
```

------
#### [ Java ]

**适用于 Java 的 SDK 2.x**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)存储库中查找完整示例，并了解如何进行设置和运行。
报告使用 Java 通过 Lambda 进行 DynamoDB 批处理项目失败。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord;

import java.util.ArrayList;
import java.util.List;

public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, StreamsEventResponse> {

    @Override
    public StreamsEventResponse handleRequest(DynamodbEvent input, Context context) {

        List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
        String curRecordSequenceNumber = "";

        for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) {
          try {
                //Process your record
                StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb();
                curRecordSequenceNumber = dynamodbRecord.getSequenceNumber();
                
            } catch (Exception e) {
                /* Since we are working with streams, we can return the failed item immediately.
                   Lambda will immediately begin to retry processing from this failed item onwards. */
                batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
                return new StreamsEventResponse(batchItemFailures);
            }
        }
       
       return new StreamsEventResponse();   
    }
}
```

------
#### [ JavaScript ]

**SDK for JavaScript（v3）**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)存储库中查找完整示例，并了解如何进行设置和运行。
使用 JavaScript 报告 Lambda 的 DynamoDB 批处理项目失败。  

```
export const handler = async (event) => {
  const records = event.Records;
  let curRecordSequenceNumber = "";

  for (const record of records) {
    try {
      // Process your record
      curRecordSequenceNumber = record.dynamodb.SequenceNumber;
    } catch (e) {
      // Return failed record's sequence number
      return { batchItemFailures: [{ itemIdentifier: curRecordSequenceNumber }] };
    }
  }

  return { batchItemFailures: [] };
};
```
使用 TypeScript 报告 Lambda 的 DynamoDB 批处理项目失败。  

```
import {
  DynamoDBBatchResponse,
  DynamoDBBatchItemFailure,
  DynamoDBStreamEvent,
} from "aws-lambda";

export const handler = async (
  event: DynamoDBStreamEvent
): Promise<DynamoDBBatchResponse> => {
  const batchItemFailures: DynamoDBBatchItemFailure[] = [];
  let curRecordSequenceNumber;

  for (const record of event.Records) {
    curRecordSequenceNumber = record.dynamodb?.SequenceNumber;

    if (curRecordSequenceNumber) {
      batchItemFailures.push({
        itemIdentifier: curRecordSequenceNumber,
      });
    }
  }

  return { batchItemFailures: batchItemFailures };
};
```

------
#### [ PHP ]

**适用于 PHP 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)存储库中查找完整示例，并了解如何进行设置和运行。
报告使用 PHP 通过 Lambda 进行 DynamoDB 批处理项目失败。  

```
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler implements StdHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handle(mixed $event, Context $context): array
    {
        $dynamoDbEvent = new DynamoDbEvent($event);
        $this->logger->info("Processing records");

        $records = $dynamoDbEvent->getRecords();
        $failedRecords = [];
        foreach ($records as $record) {
            try {
                $data = $record->getData();
                $this->logger->info(json_encode($data));
                // TODO: Do interesting work based on the new data
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
                // failed processing the record
                $failedRecords[] = $record->getSequenceNumber();
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");

        // change format for the response
        $failures = array_map(
            fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber],
            $failedRecords
        );

        return [
            'batchItemFailures' => $failures
        ];
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)存储库中查找完整示例，并了解如何进行设置和运行。
报告使用 Python 通过 Lambda 进行 DynamoDB 批处理项目失败。  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def handler(event, context):
    records = event.get("Records")
    curRecordSequenceNumber = ""
    
    for record in records:
        try:
            # Process your record
            curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"]
        except Exception as e:
            # Return failed record's sequence number
            return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]}

    return {"batchItemFailures":[]}
```

------
#### [ Ruby ]

**适用于 Ruby 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)存储库中查找完整示例，并了解如何进行设置和运行。
报告使用 Ruby 通过 Lambda 进行 DynamoDB 批处理项目失败。  

```
def lambda_handler(event:, context:)
    records = event["Records"]
    cur_record_sequence_number = ""
  
    records.each do |record|
      begin
        # Process your record
        cur_record_sequence_number = record["dynamodb"]["SequenceNumber"]
      rescue StandardError => e
        # Return failed record's sequence number
        return {"batchItemFailures" => [{"itemIdentifier" => cur_record_sequence_number}]}
      end
    end
  
    {"batchItemFailures" => []}
  end
```

------
#### [ Rust ]

**适用于 Rust 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)存储库中查找完整示例，并了解如何进行设置和运行。
报告使用 Rust 通过 Lambda 进行 DynamoDB 批处理项目失败。  

```
use aws_lambda_events::{
    event::dynamodb::{Event, EventRecord, StreamRecord},
    streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

/// Process the stream record
fn process_record(record: &EventRecord) -> Result<(), Error> {
    let stream_record: &StreamRecord = &record.change;

    // process your stream record here...
    tracing::info!("Data: {:?}", stream_record);

    Ok(())
}

/// Main Lambda handler here...
async fn function_handler(event: LambdaEvent<Event>) -> Result<DynamoDbEventResponse, Error> {
    let mut response = DynamoDbEventResponse {
        batch_item_failures: vec![],
    };

    let records = &event.payload.records;

    if records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(response);
    }

    for record in records {
        tracing::info!("EventId: {}", record.event_id);

        // Couldn't find a sequence number
        if record.change.sequence_number.is_none() {
            response.batch_item_failures.push(DynamoDbBatchItemFailure {
                item_identifier: Some("".to_string()),
            });
            return Ok(response);
        }

        // Process your record here...
        if process_record(record).is_err() {
            response.batch_item_failures.push(DynamoDbBatchItemFailure {
                item_identifier: record.change.sequence_number.clone(),
            });
            /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
            return Ok(response);
        }
    }

    tracing::info!("Successfully processed {} record(s)", records.len());

    Ok(response)
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

## 使用 Powertools for AWS Lambda 批处理器
<a name="services-ddb-batchfailurereporting-powertools"></a>

Powertools for AWS Lambda 中的批处理器实用程序会自动处理部分批处理响应逻辑，从而降低实施批处理故障报告的复杂性。下面是使用批处理器的示例：

**Python**  
有关完整的示例和设置说明，请参阅[批处理器文档](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)。
使用 AWS Lambda 批处理器处理 DynamoDB 流记录。  

```
import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes import DynamoDBStreamEvent
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
logger = Logger()

def record_handler(record):
    logger.info(record)
    # Your business logic here
    # Raise an exception to mark this record as failed
    
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(
        event=event, 
        record_handler=record_handler, 
        processor=processor,
        context=context
    )
```

**TypeScript**  
有关完整的示例和设置说明，请参阅[批处理器文档](https://docs.aws.amazon.com/powertools/typescript/latest/features/batch/)。
使用 AWS Lambda 批处理器处理 DynamoDB 流记录。  

```
import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { DynamoDBStreamEvent, Context } from 'aws-lambda';

const processor = new BatchProcessor(EventType.DynamoDBStreams);
const logger = new Logger();

const recordHandler = async (record: any): Promise<void> => {
    logger.info('Processing record', { record });
    // Your business logic here
    // Throw an error to mark this record as failed
};

export const handler = async (event: DynamoDBStreamEvent, context: Context) => {
    return processPartialResponse(event, recordHandler, processor, {
        context,
    });
};
```

**Java**  
有关完整的示例和设置说明，请参阅[批处理器文档](https://docs.powertools.aws.dev/lambda/java/latest/utilities/batch/)。
使用 AWS Lambda 批处理器处理 DynamoDB 流记录。  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;

public class DynamoDBStreamBatchHandler implements RequestHandler<DynamodbEvent, StreamsEventResponse> {

    private final BatchMessageHandler<DynamodbEvent, StreamsEventResponse> handler;

    public DynamoDBStreamBatchHandler() {
        handler = new BatchMessageHandlerBuilder()
                .withDynamoDbBatchHandler()
                .buildWithRawMessageHandler(this::processMessage);
    }

    @Override
    public StreamsEventResponse handleRequest(DynamodbEvent ddbEvent, Context context) {
        return handler.processBatch(ddbEvent, context);
    }

    private void processMessage(DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord, Context context) {
        // Process the change record
    }
}
```

**.NET**  
有关完整的示例和设置说明，请参阅[批处理器文档](https://docs.aws.amazon.com/powertools/dotnet/utilities/batch-processing/)。
使用 AWS Lambda 批处理器处理 DynamoDB 流记录。  

```
using System;
using System.Threading;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
using Amazon.Lambda.Serialization.SystemTextJson;
using AWS.Lambda.Powertools.BatchProcessing;

[assembly: LambdaSerializer(typeof(DefaultLambdaJsonSerializer))]

namespace HelloWorld;

public class Customer
{
    public string? CustomerId { get; set; }
    public string? Name { get; set; }
    public string? Email { get; set; }
    public DateTime CreatedAt { get; set; }
}

internal class TypedDynamoDbRecordHandler : ITypedRecordHandler<Customer> 
{
    public async Task<RecordHandlerResult> HandleAsync(Customer customer, CancellationToken cancellationToken)
    {
        if (string.IsNullOrEmpty(customer.Email)) 
        {
            throw new ArgumentException("Customer email is required");
        }

        return await Task.FromResult(RecordHandlerResult.None); 
    }
}

public class Function
{
    [BatchProcessor(TypedRecordHandler = typeof(TypedDynamoDbRecordHandler))]
    public BatchItemFailuresResponse HandlerUsingTypedAttribute(DynamoDBEvent _)
    {
        return TypedDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; 
    }
}
```

# 在 Lambda 中保留 DynamoDB 事件源的丢弃记录
<a name="services-dynamodb-errors"></a>

DynamoDB 事件源映射的错误处理取决于错误是在调用函数之前还是在函数调用期间发生的：
+ **调用前：**如果 Lambda 事件源映射由于节流或其他问题而无法调用该函数，则它会一直重试，直到记录过期或超过事件源映射上配置的最大期限（[MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)）。
+ **调用期间：**如果调用函数但返回错误，Lambda 会重试，直到记录过期、超过最大期限（[MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)）或达到配置的重试配额（[MaximumRetryAttempts](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRetryAttempts)）。对于函数错误，您还可以配置 [BisectBatchOnFunctionError](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-response-BisectBatchOnFunctionError)，将失败的批次拆分为两个较小的批次，从而隔离错误记录并避免超时。拆分批次不会消耗重试配额。

如果错误处理措施失败，Lambda 将丢弃记录并继续处理数据流中的批次。使用默认设置时，这意味着错误的记录可能会阻止受影响的分区上的处理，时间长达一天。为了避免这种情况，请配置函数的事件源映射，使用合理的重试次数和适合您的使用案例的最长记录期限。

## 配置失败调用的目标
<a name="dynamodb-on-failure-destination-console"></a>

要保留失败的事件源映射调用的记录，请在函数的事件源映射中添加一个目标。发送到目标的每条记录都是一个 JSON 文档，其中包含有关失败调用的元数据。对于 Amazon S3 目标，Lambda 还会发送整个调用记录以及元数据。您可以将任何 Amazon SNS 主题、Amazon SQS 队列、Amazon S3 存储桶或 Kafka 配置为目标。

借助 Amazon S3 目标，您可以使用 [Amazon S3 事件通知](https://docs.aws.amazon.com/)功能在对象上传到目标 S3 存储桶时接收通知。您还可以将 S3 事件通知配置为调用另一个 Lambda 函数来对失败的批次执行自动处理。

您的执行角色必须具有目标的权限：
+ **对于 SQS 目标：**[sqs:SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)
+ **对于 SNS 目标：**[sns:Publish](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html)
+ **对于 S3 目标：**[ s3:PutObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html) 和 [s3:ListBucket](https://docs.aws.amazon.com/AmazonS3/latest/API/ListObjectsV2.html)
+ **对于 Kafka 目标：**[kafka-cluster:WriteData](https://docs.aws.amazon.com/msk/latest/developerguide/kafka-actions.html)

您可以将 Kafka 主题配置为 Kafka 事件源映射失败时的目标。当 Lambda 在重试次数用尽后仍无法处理记录，或者当记录的保存时间超过最大期限时，Lambda 会将这些失败的记录发送至指定的 Kafka 主题，以便后续进行处理。请参考[使用 Kafka 主题作为失败时的目标](kafka-on-failure-destination.md)。

如果您已使用自己的 KMS 密钥为 S3 目标启用加密，则函数的执行角色还必须具有调用 [kms:GenerateDataKey](https://docs.aws.amazon.com/kms/latest/APIReference/API_GenerateDataKey.html) 的权限。如果 KMS 密钥和 S3 存储桶目标与您的 Lambda 函数和执行角色位于不同的账户中，请将 KMS 密钥配置为信任执行角色以允许 kms:GenerateDataKey。

要使用控制台配置失败时的目标，请执行以下步骤：

1. 打开 Lamba 控制台的[函数](https://console.aws.amazon.com/lambda/home#/functions)页面。

1. 选择函数。

1. 在 **Function overview (函数概览)** 下，选择 **Add destination (添加目标)**。

1. 对于**源**，请选择**事件源映射调用**。

1. 对于**事件源映射**，请选择为此函数配置的事件源。

1. 在**条件**中，选择**失败时**。对于事件源映射调用，这是唯一可接受的条件。

1. 对于**目标类型**，请选择 Lambda 要发送调用记录的目标类型。

1. 对于 **Destination (目标)**，请选择一个资源。

1. 选择**保存**。

您还可以使用 AWS Command Line Interface（AWS CLI）配置失败时的目标。例如，以 [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) 命令将带有 SQS 失败时目标的事件源映射添加到 `MyFunction`：

```
aws lambda create-event-source-mapping \
--function-name "MyFunction" \
--event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
```

以下 [update-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/update-event-source-mapping.html) 命令更新事件源映射，以在两次重试之后或记录超过一小时后将失败的调用记录发送到 SNS 目标。

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--maximum-retry-attempts 2 \
--maximum-record-age-in-seconds 3600 \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'
```

更新的设置是异步应用的，并且直到该过程完成才反映在输出中。使用 [get-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/get-event-source-mapping.html) 命令查看当前状态。

要移除目标，请提供一个空字符串作为 `destination-config` 参数的实际参数：

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--destination-config '{"OnFailure": {"Destination": ""}}'
```

### Amazon S3 目标的安全最佳实践
<a name="ddb-s3-destination-security"></a>

如果删除配置为目标的 S3 存储桶而不将目标从函数配置中删除，则可能会造成安全风险。如果其他用户知道您的目标存储桶的名称，则他们可以在其 AWS 账户中重新创建存储桶。调用失败的记录将发送到存储桶，这可能会暴露您函数中的数据。

**警告**  
为确保您的函数中的调用记录不会发送到另一个 AWS 账户中的 S3 存储桶，请向函数的执行角色添加条件，以将 `s3:PutObject` 权限限制为您账户中的存储桶。

以下示例显示了一个 IAM 策略，该策略将您函数的 `s3:PutObject` 权限限制为您账户中的存储桶。该策略还为 Lambda 提供了使用 S3 存储桶作为目标所需的 `s3:ListBucket` 权限。

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3BucketResourceAccountWrite",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::*/*",
                "arn:aws:s3:::*"
            ],
            "Condition": {
                "StringEquals": {
                    "s3:ResourceAccount": "111122223333"
                }
            }
        }
    ]
}
```

要使用 AWS 管理控制台 或 AWS CLI 向函数的执行角色添加权限策略，请参阅以下程序中的说明：

------
#### [ Console ]

**向函数的执行角色添加权限策略（控制台）**

1. 打开 Lamba 控制台的[函数](https://console.aws.amazon.com/lambda/home#/functions)页面。

1. 选择想要修改其执行角色的 Lambda 函数。

1. 在**配置**选项卡中，选择**权限**。

1. 在**执行角色**选项卡中，选择您的函数的**角色名称**,以打开该角色的 IAM 控制台页面。

1. 通过执行以下操作，向角色添加权限策略：

   1. 在**权限策略**窗格中，选择**添加权限**，然后选择**创建内联策略**。

   1. 在**策略编辑器**中，选择 **JSON**。

   1. 将要添加的策略粘贴到编辑器中（替换现有 JSON），然后选择**下一步**。

   1. 在**策略详细信息**下，输入**策略名称**。

   1. 选择**创建策略**。

------
#### [ AWS CLI ]

**向函数的执行角色添加权限策略（CLI）**

1. 创建具有所需权限的 JSON 策略文档并将其保存在本地目录中。

1. 使用 IAM `put-role-policy` CLI 命令向您函数的执行角色添加权限。在您保存 JSON 策略文档的目录中运行以下命令，并用您自己的值替换角色名称、策略名称和策略文档。

   ```
   aws iam put-role-policy \
   --role-name my_lambda_role \
   --policy-name LambdaS3DestinationPolicy \
   --policy-document file://my_policy.json
   ```

------

### Amazon SNS 和 Amazon SQS 调用记录示例
<a name="kinesis-on-failure-destination-example-sns-sqs"></a>

以下示例显示了 Lambda 发送到 DynamoDB 流的 SQS 或 SNS 目标的调用记录。

```
{
    "requestContext": {
        "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:13:49.717Z",
    "DDBStreamBatchInfo": {
        "shardId": "shardId-00000001573689847184-864758bb",
        "startSequenceNumber": "800000000003126276362",
        "endSequenceNumber": "800000000003126276362",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z",
        "batchSize": 1,
        "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388"
    }
}
```

您可以使用此信息从流中检索受影响的记录以进行故障排除。实际记录不包括在内，因此您必须处理此记录并在记录过期和丢失之前从流中检索它们。

### Amazon S3 调用记录示例
<a name="kinesis-on-failure-destination-example-sns-sqs-s3"></a>

以下示例显示了 Lambda 发送到 DynamoDB 流的 S3 存储桶的调用记录。除了针对 SQS 和 SNS 目标的上一示例中的所有字段外，`payload` 字段还包含作为转义 JSON 字符串的原始调用记录。

```
{
    "requestContext": {
        "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:13:49.717Z",
    "DDBStreamBatchInfo": {
        "shardId": "shardId-00000001573689847184-864758bb",
        "startSequenceNumber": "800000000003126276362",
        "endSequenceNumber": "800000000003126276362",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z",
        "batchSize": 1,
        "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388"
    },
    "payload": "<Whole Event>" // Only available in S3
}
```

包含了调用记录的 S3 对象使用以下命名约定：

```
aws/lambda/<ESM-UUID>/<shardID>/YYYY/MM/DD/YYYY-MM-DDTHH.MM.SS-<Random UUID>
```

# 在 Lambda 中实现有状态的 DynamoDB 流处理
<a name="services-ddb-windows"></a>

Lambda 函数可以运行连续流处理应用程序。流表示通过您的应用程序持续流动的无边界数据。要分析这种不断更新的输入中的信息，可以使用按时间定义的窗口来限制包含的记录。

滚动窗口是定期打开和关闭的不同窗口。预设情况下，Lambda 调用是无状态的，在没有外部数据库的情况下，无法使用它们跨多次连续调用处理数据。但是，有了滚动窗口后，您可以在不同调用中保持状态。此状态包含之前为当前窗口处理的消息的汇总结果。您的状态最多可以是每个分片 1MB。如果超过该大小，Lambda 将提前终止窗口。

流中的每条记录都属于特定窗口。Lambda 将至少处理每条记录一次，但不保证每条记录只处理一次。在极少数情况下（例如错误处理），某些记录可能会被多次处理。第一次处理记录时始终按顺序处理。如果多次处理记录，则可能会不按顺序处理。

## 聚合和处理
<a name="streams-tumbling-processing"></a>

系统将调用您的用户托管函数以便聚合和处理该聚合的最终结果。Lambda 汇总在该窗口中接收的所有记录。您可以分多个批次接收这些记录，每个批次都作为单独的调用。每次调用都会收到一个状态。因此，当使用滚动窗口时，Lambda 函数响应必须包含 `state` 属性。如果响应不包含 `state` 属性，Lambda 会将其视作失败的调用。为了满足该条件，您的函数可以返回一个具有以下 JSON 形状的 `TimeWindowEventResponse` 对象：

**Example `TimeWindowEventResponse` 值**  

```
{
    "state": {
        "1": 282,
        "2": 715
    },
    "batchItemFailures": []
}
```

**注意**  
对于 Java 函数，我们建议使用 `Map<String, String>` 来表示状态。

在窗口末尾，标志 `isFinalInvokeForWindow` 被设置 `true`，以表示这是最终状态，并且已准备好进行处理。处理完成后，窗口完成，最终调用完成，然后状态将被删除。

在窗口结束时，Lambda 会对针对聚合结果的操作应用最终处理。您的最终处理将同步调用。成功调用后，函数会检查序列号并继续进行流处理。如果调用失败，则您的 Lambda 函数将暂停进一步处理，直到成功调用为止。

**Example DynamodbTimeWindowEvent**  

```
{
   "Records":[
      {
         "eventID":"1",
         "eventName":"INSERT",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"111",
            "SizeBytes":26,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"2",
         "eventName":"MODIFY",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"222",
            "SizeBytes":59,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"3",
         "eventName":"REMOVE",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"333",
            "SizeBytes":38,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      }
   ],
    "window": {
        "start": "2020-07-30T17:00:00Z",
        "end": "2020-07-30T17:05:00Z"
    },
    "state": {
        "1": "state1"
    },
    "shardId": "shard123456789",
    "eventSourceARN": "stream-ARN",
    "isFinalInvokeForWindow": false,
    "isWindowTerminatedEarly": false
}
```

## 配置
<a name="streams-tumbling-config"></a>

您可以在创建或更新事件源映射时配置滚动窗口。要配置翻转窗口，请以秒为单位进行指定（[TumblingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-TumblingWindowInSeconds)）。以下示例 AWS Command Line Interface (AWS CLI) 命令会创建一个滚动窗口为 120 秒的流式事件源映射。为聚合和处理定义的 Lambda 函数被命名为 `tumbling-window-example-function`。

```
aws lambda create-event-source-mapping \
--event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \
--function-name tumbling-window-example-function \
--starting-position TRIM_HORIZON \
--tumbling-window-in-seconds 120
```

Lambda 根据记录插入到流的时间来确定滚动窗口的边界。所有记录都有一个大致的时间戳，供 Lambda 在确定边界时使用。

滚动窗口聚合不支持重新分片。当分区结束时，Lambda 会认为窗口已关闭，子分区将以全新的状态启动自己的窗口。

滚动窗口完全支持现有的重试策略 `maxRetryAttempts` 和 `maxRecordAge`。

**Example Handler.py – 聚合和处理**  
以下 Python 函数演示了如何聚合然后处理您的最终状态：  

```
def lambda_handler(event, context):
    print('Incoming event: ', event)
    print('Incoming state: ', event['state'])

#Check if this is the end of the window to either aggregate or process.
    if event['isFinalInvokeForWindow']:
        # logic to handle final state of the window
        print('Destination invoke')
    else:
        print('Aggregate invoke')

#Check for early terminations
    if event['isWindowTerminatedEarly']:
        print('Window terminated early')

    #Aggregation logic
    state = event['state']
    for record in event['Records']:
        state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1

    print('Returning state: ', state)
    return {'state': state}
```

# Amazon DynamoDB 事件源映射的 Lambda 参数
<a name="services-ddb-params"></a>

所有 Lambda 事件源类型共享相同的 [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) 和 [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) API 操作。但是，只有部分参数适用于 DynamoDB Streams。


| 参数 | 必需 | 默认值 | 备注 | 
| --- | --- | --- | --- | 
|  BatchSize  |  否  |  100  |  最大值：10000  | 
|  BisectBatchOnFunctionError  |  N  |  false  | none  | 
|  DestinationConfig  |  N  | 不适用  |  丢弃的记录的标准 Amazon SQS 队列或标准 Amazon SNS 主题目标。  | 
|  启用  |  N  |  真实  | none  | 
|  EventSourceArn  |  Y  | 不适用 |  数据流或流使用者的 ARN  | 
|  FilterCriteria  |  N  | 不适用  |  [控制 Lambda 向您的函数发送的事件](invocation-eventfiltering.md)  | 
|  FunctionName  |  是  | 不适用  | none  | 
|  FunctionResponseTypes  |  N  | 不适用 |  要使您的函数报告某个批处理中的特定失败，请在 `FunctionResponseTypes` 中包含值 `ReportBatchItemFailures`。有关更多信息，请参阅 [使用 DynamoDB 和 Lambda 配置部分批处理响应](services-ddb-batchfailurereporting.md)。  | 
|  MaximumBatchingWindowInSeconds  |  N  |  0  | none  | 
|  MaximumRecordAgeInSeconds  |  N  |  –1  |  -1 表示无限：会一直重试失败的记录，直到记录过期。[DynamoDB Streams 的数据留存期限为](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.DataRetention) 24 小时。 最小值：-1 最大值：604800  | 
|  MaximumRetryAttempts  |  N  |  –1  |  -1 表示无限：会一直重试失败的记录，直到记录过期。 最小值：0 最大值：10000  | 
|  ParallelizationFactor  |  N  |  1  |  最大值：10  | 
|  StartingPosition  |  Y  | 不适用  |  TRIM\$1HORIZON 或 LATEST（最新）  | 
|  TumblingWindowInSeconds  |  N  | 不适用  |  最小值：0 最大值：900  | 

# 对 DynamoDB 事件源使用事件筛选
<a name="with-ddb-filtering"></a>

您可以使用事件筛选，控制 Lambda 将流或队列中的哪些记录发送给函数。有关事件筛选工作原理的一般信息，请参阅 [控制 Lambda 向您的函数发送的事件](invocation-eventfiltering.md)。

本节重点介绍 DynamoDB 事件源的事件筛选。

**注意**  
DynamoDB 事件源映射仅支持对 `dynamodb` 键进行筛选。

**Topics**
+ [DynamoDB 事件](#filtering-ddb)
+ [使用表格属性进行筛选](#filtering-ddb-attributes)
+ [使用布尔表达式进行筛选](#filtering-ddb-boolean)
+ [使用 Exists 运算符](#filtering-ddb-exists)
+ [用于 DynamoDB 筛选的 JSON 格式](#filtering-ddb-JSON-format)

## DynamoDB 事件
<a name="filtering-ddb"></a>

假设您有一个 DynamoDB 表，其中包含主键 `CustomerName`、属性 `AccountManager` 和 `PaymentTerms`。下面显示了来自 DynamoDB 表流的示例记录。

```
{
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
          "ApproximateCreationDateTime": "1678831218.0",
          "Keys": {
              "CustomerName": {
                  "S": "AnyCompany Industries"
              }
          },
          "NewImage": {
              "AccountManager": {
                  "S": "Pat Candella"
              },
              "PaymentTerms": {
                  "S": "60 days"
              },
              "CustomerName": {
                  "S": "AnyCompany Industries"
              }
          },
          "SequenceNumber": "111",
          "SizeBytes": 26,
          "StreamViewType": "NEW_IMAGE"
      }
  }
```

要根据 DynamoDB 表中的键和属性值进行筛选，请使用记录中的 `dynamodb` 键。以下部分提供了不同筛选条件类型的示例。

### 使用表键进行筛选
<a name="filtering-ddb-keys"></a>

假设您希望函数仅处理主键 `CustomerName` 为“AnyCompany Industries”的记录。`FilterCriteria` 对象将如下所示。

```
{
     "Filters": [
          {
              "Pattern": "{ \"dynamodb\" : { \"Keys\" : { \"CustomerName\" : { \"S\" : [ \"AnyCompany Industries\" ] } } } }"
          }
      ]
 }
```

为了更清楚起见，以下是在纯 JSON 中展开的筛选条件 `Pattern` 的值。

```
{
     "dynamodb": {
          "Keys": {
              "CustomerName": {
                  "S": [ "AnyCompany Industries" ]
                  }
              }
          }
 }
```

您可以使用控制台、AWS CLI 或 AWS SAM 模板添加筛选条件。

------
#### [ Console ]

要使用控制台添加此筛选条件，请按照 [将筛选条件附加到事件源映射（控制台）](invocation-eventfiltering.md#filtering-console) 中的说明，为**筛选条件**输入以下字符串。

```
{ "dynamodb" : { "Keys" : { "CustomerName" : { "S" : [ "AnyCompany Industries" ] } } } }
```

------
#### [ AWS CLI ]

要使用 AWS Command Line Interface（AWS CLI）创建包含这些筛选条件的新事件源映射，请运行以下命令。

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"Keys\" : { \"CustomerName\" : { \"S\" : [ \"AnyCompany Industries\" ] } } } }"}]}'
```

要将这些筛选条件添加到现有事件源映射中，请运行以下命令。

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"Keys\" : { \"CustomerName\" : { \"S\" : [ \"AnyCompany Industries\" ] } } } }"}]}'
```

------
#### [ AWS SAM ]

要使用 AWS SAM 添加此筛选条件，请将以下代码段添加到事件源的 YAML 模板中。

```
FilterCriteria:
   Filters:
     - Pattern: '{ "dynamodb" : { "Keys" : { "CustomerName" : { "S" : [ "AnyCompany Industries" ] } } } }'
```

------

## 使用表格属性进行筛选
<a name="filtering-ddb-attributes"></a>

借助 DynamoDB，您还可以使用 `NewImage` 和 `OldImage` 键来筛选属性值。假设您要筛选最新表格图像中 `AccountManager` 属性为“Pat Candella”或“Shirley Rodriguez”的记录。`FilterCriteria` 对象将如下所示。

```
{
    "Filters": [
        {
            "Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\", \"Shirley Rodriguez\" ] } } } }"
        }
    ]
}
```

为了更清楚起见，以下是在纯 JSON 中展开的筛选条件 `Pattern` 的值。

```
{
    "dynamodb": {
        "NewImage": {
            "AccountManager": {
                "S": [ "Pat Candella", "Shirley Rodriguez" ]
            }
        }
    }
}
```

您可以使用控制台、AWS CLI 或 AWS SAM 模板添加筛选条件。

------
#### [ Console ]

要使用控制台添加此筛选条件，请按照 [将筛选条件附加到事件源映射（控制台）](invocation-eventfiltering.md#filtering-console) 中的说明，为**筛选条件**输入以下字符串。

```
{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella", "Shirley Rodriguez" ] } } } }
```

------
#### [ AWS CLI ]

要使用 AWS Command Line Interface（AWS CLI）创建包含这些筛选条件的新事件源映射，请运行以下命令。

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\", \"Shirley Rodriguez\" ] } } } }"}]}'
```

要将这些筛选条件添加到现有事件源映射中，请运行以下命令。

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\", \"Shirley Rodriguez\" ] } } } }"}]}'
```

------
#### [ AWS SAM ]

要使用 AWS SAM 添加此筛选条件，请将以下代码段添加到事件源的 YAML 模板中。

```
FilterCriteria:
  Filters:
    - Pattern: '{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella", "Shirley Rodriguez" ] } } } }'
```

------

## 使用布尔表达式进行筛选
<a name="filtering-ddb-boolean"></a>

您也可以使用布尔 AND 表达式创建筛选器。这些表达式可能同时包含表的键和属性参数。假设您想要筛选 `AccountManager` 的 `NewImage` 值为“Pat Candella”且 `OldImage` 值为“Terry Whitlock”的记录。`FilterCriteria` 对象将如下所示。

```
{
    "Filters": [
        {
            "Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\" ] } } } , \"dynamodb\" : { \"OldImage\" : { \"AccountManager\" : { \"S\" : [ \"Terry Whitlock\" ] } } } }"
        }
    ]
}
```

为了更清楚起见，以下是在纯 JSON 中展开的筛选条件 `Pattern` 的值。

```
{ 
    "dynamodb" : { 
        "NewImage" : { 
            "AccountManager" : { 
                "S" : [ 
                    "Pat Candella" 
                ] 
            } 
        } 
    }, 
    "dynamodb": { 
        "OldImage": { 
            "AccountManager": { 
                "S": [ 
                    "Terry Whitlock" 
                ] 
            } 
        } 
    } 
}
```

您可以使用控制台、AWS CLI 或 AWS SAM 模板添加筛选条件。

------
#### [ Console ]

要使用控制台添加此筛选条件，请按照 [将筛选条件附加到事件源映射（控制台）](invocation-eventfiltering.md#filtering-console) 中的说明，为**筛选条件**输入以下字符串。

```
{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella" ] } } } , "dynamodb" : { "OldImage" : { "AccountManager" : { "S" : [ "Terry Whitlock" ] } } } }
```

------
#### [ AWS CLI ]

要使用 AWS Command Line Interface（AWS CLI）创建包含这些筛选条件的新事件源映射，请运行以下命令。

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\" ] } } } , \"dynamodb\" : { \"OldImage\" : { \"AccountManager\" : { \"S\" : [ \"Terry Whitlock\" ] } } } } "}]}'
```

要将这些筛选条件添加到现有事件源映射中，请运行以下命令。

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\" ] } } } , \"dynamodb\" : { \"OldImage\" : { \"AccountManager\" : { \"S\" : [ \"Terry Whitlock\" ] } } } } "}]}'
```

------
#### [ AWS SAM ]

要使用 AWS SAM 添加此筛选条件，请将以下代码段添加到事件源的 YAML 模板中。

```
FilterCriteria:
  Filters:
    - Pattern: '{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella" ] } } } , "dynamodb" : { "OldImage" : { "AccountManager" : { "S" : [ "Terry Whitlock" ] } } } }'
```

------

**注意**  
DynamoDB 事件筛选不支持使用数字运算符（数字相等和数字范围）。即使表中的项目存储为数字，这些参数也会转换为 JSON 记录对象中的字符串。

## 使用 Exists 运算符
<a name="filtering-ddb-exists"></a>

鉴于 DynamoDB 中 JSON 事件对象的结构方式，使用 Exists 运算符需要特别小心。Exists 运算符仅适用于事件 JSON 中的叶节点，若筛选条件模式使用 Exists 来测试中间节点，则不会起作用。请考虑以下 DynamoDB 表项目：

```
{
  "UserID": {"S": "12345"},
  "Name": {"S": "John Doe"},
  "Organizations": {"L": [
      {"S":"Sales"},
      {"S":"Marketing"},
      {"S":"Support"}
    ]
  }
}
```

您可能需要创建如下所示的筛选条件模式来测试包含 `"Organizations"` 的事件：

```
{ "dynamodb" : { "NewImage" : { "Organizations" : [ { "exists": true } ] } } }
```

不过，此筛选条件模式永远不会返回匹配项，因为 `"Organizations"` 不是叶节点。以下示例展示了如何正确使用 Exists 运算符来构造所需的筛选条件模式：

```
{ "dynamodb" : { "NewImage" : {"Organizations": {"L": {"S": [ {"exists": true } ] } } } } }
```

## 用于 DynamoDB 筛选的 JSON 格式
<a name="filtering-ddb-JSON-format"></a>

要正确筛选 DynamoDB 源中的事件，数据字段及其筛选条件 (`dynamodb`) 都必须为有效的 JSON 格式。如果任一字段不为有效的 JSON 格式，Lambda 将会丢弃消息或引发异常。下表汇总了具体行为：


| 传入数据格式 | 数据属性中的筛选条件模式格式 | 导致的操作 | 
| --- | --- | --- | 
|  有效 JSON  |  有效 JSON  |  Lambda 根据您的筛选条件进行筛选。  | 
|  有效 JSON  |  数据属性中没有筛选条件模式  |  Lambda 根据您的筛选条件进行筛选（仅限其他元数据属性）。  | 
|  有效 JSON  |  非 JSON  |  Lambda 在事件源映射创建或更新时引发异常。数据属性的筛选条件模式必须为有效的 JSON 格式。  | 
|  非 JSON  |  有效 JSON  |  Lambda 将丢弃记录。  | 
|  非 JSON  |  数据属性中没有筛选条件模式  |  Lambda 根据您的筛选条件进行筛选（仅限其他元数据属性）。  | 
|  非 JSON  |  非 JSON  |  Lambda 在事件源映射创建或更新时引发异常。数据属性的筛选条件模式必须为有效的 JSON 格式。  | 

# 教程：将 AWS Lambda 与 Amazon DynamoDB Streams 结合使用
<a name="with-ddb-example"></a>

 在本教程中，您将创建 Lambda 函数处理来自 Amazon DynamoDB Streams 的事件。

## 先决条件
<a name="with-ddb-prepare"></a>

### 安装 AWS Command Line Interface
<a name="install_aws_cli"></a>

如果您尚未安装 AWS Command Line Interface，请按照[安装或更新最新版本的 AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) 中的步骤进行安装。

本教程需要命令行终端或 Shell 来运行命令。在 Linux 和 macOS 中，可使用您首选的 Shell 和程序包管理器。

**注意**  
在 Windows 中，操作系统的内置终端不支持您经常与 Lambda 一起使用的某些 Bash CLI 命令（例如 `zip`）。[安装 Windows Subsystem for Linux](https://docs.microsoft.com/en-us/windows/wsl/install-win10)，获取 Ubuntu 和 Bash 与 Windows 集成的版本。

## 创建执行角色
<a name="with-ddb-create-execution-role"></a>

创建[执行角色](lambda-intro-execution-role.md)，向您的函数授予访问 AWS 资源的权限。

**创建执行角色**

1. 在 IAM 控制台中，打开 [Roles（角色）页面](https://console.aws.amazon.com/iam/home#/roles)。

1. 选择**创建角色**。

1. 创建具有以下属性的角色。
   + **Trusted entity**（可信任的实体）– Lambda。
   + **Permissions**（权限）– **AWSLambdaDynamoDBExecutionRole**。
   + **Role name**（角色名称）– **lambda-dynamodb-role**。

**AWSLambdaDynamoDBExecutionRole** 具有该函数所需的权限以从 DynamoDB 中读取项目并将日志写入 CloudWatch Logs。

## 创建函数
<a name="with-ddb-example-create-function"></a>

创建一个 Lambda 函数来处理 DynamoDB 事件。函数代码会将一些传入的事件数据写入 CloudWatch Logs。

------
#### [ .NET ]

**适用于 .NET 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 .NET 将 DynamoDB 事件与 Lambda 结合使用。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace AWSLambda_DDB;

public class Function
{
    public void FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
    {
        context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");

        foreach (var record in dynamoEvent.Records)
        {
            context.Logger.LogInformation($"Event ID: {record.EventID}");
            context.Logger.LogInformation($"Event Name: {record.EventName}");

            context.Logger.LogInformation(JsonSerializer.Serialize(record));
        }

        context.Logger.LogInformation("Stream processing complete.");
    }
}
```

------
#### [ Go ]

**适用于 Go 的 SDK V2**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 Go 将 DynamoDB 事件与 Lambda 结合使用。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-lambda-go/events"
	"fmt"
)

func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*string, error) {
	if len(event.Records) == 0 {
		return nil, fmt.Errorf("received empty event")
	}

	for _, record := range event.Records {
	 	LogDynamoDBRecord(record)
	}

	message := fmt.Sprintf("Records processed: %d", len(event.Records))
	return &message, nil
}

func main() {
	lambda.Start(HandleRequest)
}

func LogDynamoDBRecord(record events.DynamoDBEventRecord){
	fmt.Println(record.EventID)
	fmt.Println(record.EventName)
	fmt.Printf("%+v\n", record.Change)
}
```

------
#### [ Java ]

**适用于 Java 的 SDK 2.x**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 Java 将 DynamoDB 事件与 Lambda 结合使用。  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

public class example implements RequestHandler<DynamodbEvent, Void> {

    private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create();

    @Override
    public Void handleRequest(DynamodbEvent event, Context context) {
        System.out.println(GSON.toJson(event));
        event.getRecords().forEach(this::logDynamoDBRecord);
        return null;
    }

    private void logDynamoDBRecord(DynamodbStreamRecord record) {
        System.out.println(record.getEventID());
        System.out.println(record.getEventName());
        System.out.println("DynamoDB Record: " + GSON.toJson(record.getDynamodb()));
    }
}
```

------
#### [ JavaScript ]

**SDK for JavaScript（v3）**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 JavaScript 将 DynamoDB 事件与 Lambda 结合使用。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(record => {
        logDynamoDBRecord(record);
    });
};

const logDynamoDBRecord = (record) => {
    console.log(record.eventID);
    console.log(record.eventName);
    console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
```
使用 TypeScript 将 DynamoDB 事件与 Lambda 结合使用。  

```
export const handler = async (event, context) => {
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(record => {
        logDynamoDBRecord(record);
    });
}
const logDynamoDBRecord = (record) => {
    console.log(record.eventID);
    console.log(record.eventName);
    console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
```

------
#### [ PHP ]

**适用于 PHP 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 PHP 将 DynamoDB 事件与 Lambda 结合使用。  

```
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\DynamoDb\DynamoDbHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler extends DynamoDbHandler
{
    private StderrLogger $logger;

    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handleDynamoDb(DynamoDbEvent $event, Context $context): void
    {
        $this->logger->info("Processing DynamoDb table items");
        $records = $event->getRecords();

        foreach ($records as $record) {
            $eventName = $record->getEventName();
            $keys = $record->getKeys();
            $old = $record->getOldImage();
            $new = $record->getNewImage();
            
            $this->logger->info("Event Name:".$eventName."\n");
            $this->logger->info("Keys:". json_encode($keys)."\n");
            $this->logger->info("Old Image:". json_encode($old)."\n");
            $this->logger->info("New Image:". json_encode($new));
            
            // TODO: Do interesting work based on the new data

            // Any exception thrown will be logged and the invocation will be marked as failed
        }

        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords items");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 Python 将 DynamoDB 事件与 Lambda 结合使用。  

```
import json

def lambda_handler(event, context):
    print(json.dumps(event, indent=2))

    for record in event['Records']:
        log_dynamodb_record(record)

def log_dynamodb_record(record):
    print(record['eventID'])
    print(record['eventName'])
    print(f"DynamoDB Record: {json.dumps(record['dynamodb'])}")
```

------
#### [ Ruby ]

**适用于 Ruby 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 Ruby 将 DynamoDB 事件与 Lambda 结合使用。  

```
def lambda_handler(event:, context:)
    return 'received empty event' if event['Records'].empty?
  
    event['Records'].each do |record|
      log_dynamodb_record(record)
    end
  
    "Records processed: #{event['Records'].length}"
  end
  
  def log_dynamodb_record(record)
    puts record['eventID']
    puts record['eventName']
    puts "DynamoDB Record: #{JSON.generate(record['dynamodb'])}"
  end
```

------
#### [ Rust ]

**适用于 Rust 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 Rust 将 DynamoDB 事件与 Lambda 结合使用。  

```
use lambda_runtime::{service_fn, tracing, Error, LambdaEvent};
use aws_lambda_events::{
    event::dynamodb::{Event, EventRecord},
   };


// Built with the following dependencies:
//lambda_runtime = "0.11.1"
//serde_json = "1.0"
//tokio = { version = "1", features = ["macros"] }
//tracing = { version = "0.1", features = ["log"] }
//tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }
//aws_lambda_events = "0.15.0"

async fn function_handler(event: LambdaEvent<Event>) ->Result<(), Error> {
    
    let records = &event.payload.records;
    tracing::info!("event payload: {:?}",records);
    if records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(());
    }

    for record in records{
        log_dynamo_dbrecord(record);
    }

    tracing::info!("Dynamo db records processed");

    // Prepare the response
    Ok(())

}

fn log_dynamo_dbrecord(record: &EventRecord)-> Result<(), Error>{
    tracing::info!("EventId: {}", record.event_id);
    tracing::info!("EventName: {}", record.event_name);
    tracing::info!("DynamoDB Record: {:?}", record.change );
    Ok(())

}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
    .with_max_level(tracing::Level::INFO)
    .with_target(false)
    .without_time()
    .init();

    let func = service_fn(function_handler);
    lambda_runtime::run(func).await?;
    Ok(())
    
}
```

------

**创建函数**

1. 将示例代码复制到名为 `example.js` 的文件中。

1. 创建部署程序包。

   ```
   zip function.zip example.js
   ```

1. 使用 `create-function` 命令创建 Lambda 函数。

   ```
   aws lambda create-function --function-name ProcessDynamoDBRecords \
       --zip-file fileb://function.zip --handler example.handler --runtime nodejs24.x \
       --role arn:aws:iam::111122223333:role/lambda-dynamodb-role
   ```

## 测试 Lambda 函数
<a name="with-dbb-invoke-manually"></a>

在本步骤中，您将使用 `invoke` AWS Lambda CLI 命令和以下示例 DynamoDB 事件手动调用 Lambda 函数。将以下内容复制到名为 `input.txt` 的文件中。

**Example input.txt**  

```
{
   "Records":[
      {
         "eventID":"1",
         "eventName":"INSERT",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"111",
            "SizeBytes":26,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"2",
         "eventName":"MODIFY",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"222",
            "SizeBytes":59,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"3",
         "eventName":"REMOVE",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"333",
            "SizeBytes":38,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      }
   ]
}
```

运行以下 `invoke` 命令：

```
aws lambda invoke --function-name ProcessDynamoDBRecords \
    --cli-binary-format raw-in-base64-out \
    --payload file://input.txt outputfile.txt
```

如果使用 **cli-binary-format** 版本 2，则 AWS CLI 选项是必需的。要将其设为默认设置，请运行 `aws configure set cli-binary-format raw-in-base64-out`。有关更多信息，请参阅*版本 2 的 AWS Command Line Interface 用户指南*中的 [AWS CLI 支持的全局命令行选项](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-options.html#cli-configure-options-list)。

函数在响应正文中返回字符串 `message`。

在 `outputfile.txt` 文件中验证输出。

## 创建启用流的 DynamoDB 表
<a name="with-ddb-create-buckets"></a>

创建启用流的 Amazon DynamoDB 表。

**创建 DynamoDB 表**

1. 打开 [DynamoDB 控制台](https://console.aws.amazon.com/dynamodb)。

1. 选择 **Create Table**。

1. 使用以下设置创建表。
   + **Table name**（表名称）- **lambda-dynamodb-stream**
   + **Primary key**（主键）– **id**（字符串）

1. 选择**创建**。

**启用流**

1. 打开 [DynamoDB 控制台](https://console.aws.amazon.com/dynamodb)。

1. 选择**表**。

1. 选择 **lambda-dynamodb-stream** 表。

1. 在 **Exports and streams**（导出和流）下，选择 **DynamoDB stream details**（DynamoDB 流详细信息）。

1. 选择**打开**。

1. 对于**视图类型**，选择**仅键属性**。

1. 选择**开启流**。

记下流 ARN。在下一步中将该流与 Lambda 函数关联时，您将需要此类信息。有关启用流的更多信息，请参阅[使用 DynamoDB Streams 捕获表活动](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html)。

## 在 AWS Lambda 中添加事件源
<a name="with-ddb-attach-notification-configuration"></a>

在 AWS Lambda 中创建事件源映射。此事件源映射将 DynamoDB Streams 与 Lambda 函数关联。创建此事件源映射后，AWS Lambda 即开始轮询该流。

运行以下 AWS CLI `create-event-source-mapping` 命令。命令运行后，记下 UUID。在任何命令中，如删除事件源映射时，您都需要该 UUID 来引用事件源映射。

```
aws lambda create-event-source-mapping --function-name ProcessDynamoDBRecords \
    --batch-size 100 --starting-position LATEST --event-source DynamoDB-stream-arn
```

 这会在指定的 DynamoDB Streams 和 Lambda 函数之间创建映射。您可将一个 DynamoDB Streams 关联到多个 Lambda 函数，也可将同一个 Lambda 函数关联到多个流。但是，Lambda 函数将共享其所共享的流的读取吞吐量。

您可以通过运行以下命令获取事件源映射的列表。

```
aws lambda list-event-source-mappings
```

该列表返回您创建的所有事件源映射，而对于每个映射，它都显示 `LastProcessingResult` 等信息。该字段用于在出现任何问题时提供信息性消息。`No records processed`（指示 AWS Lambda 未开始轮询或流中没有任何记录）和 `OK`（指示 AWS Lambda 已成功读取流中的记录并已调用 Lambda 函数）等值表示未出现任何问题。如果出现问题，您将收到一条错误消息。

如果您有大量事件源映射，请使用函数名称参数缩窄结果范围。

```
aws lambda list-event-source-mappings --function-name ProcessDynamoDBRecords
```

## 测试设置
<a name="with-ddb-final-integration-test-no-iam"></a>

测试端到端体验。当您更新表时，DynamoDB 会将事件记录写入流。当 AWS Lambda 轮询该流时，它将在流中检测新记录并通过向该函数传递事件来代表您调用 Lambda 函数。

1. 在 DynamoDB 控制台中，添加、更新、删除表中的项目。DynamoDB 会将这些操作记录写入流。

1. AWS Lambda 会轮询该流，当检测到流有更新时，它会通过传递在流中发现的事件数据来调用 Lambda 函数。

1. 函数运行并在 Amazon CloudWatch 中创建日志。您可以验证 Amazon CloudWatch 控制台中报告的日志。

## 后续步骤
<a name="with-ddb-next-steps"></a>

本教程介绍使用 Lambda 处理 DynamoDB 流事件的基础知识。对于生产工作负载，请考虑实施部分批处理响应逻辑，以更有效地处理单个记录故障。Powertools for AWS Lambda 中的[批处理器实用程序](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)适用于 Python、TypeScript、.NET 和 Java，并为此提供了强大的解决方案，可以自动处理部分批处理响应的复杂性并减少成功处理记录的重试次数。

## 清除资源
<a name="cleanup"></a>

除非您想要保留为本教程创建的资源，否则可立即将其删除。通过删除您不再使用的 AWS 资源，可防止您的 AWS 账户 产生不必要的费用。

**删除 Lambda 函数**

1. 打开 Lamba 控制台的 [Functions（函数）页面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 选择您创建的函数。

1. 依次选择**操作**和**删除**。

1. 在文本输入字段中键入 **confirm**，然后选择**删除**。

**删除执行角色**

1. 打开 IAM 控制台的[角色页面](https://console.aws.amazon.com/iam/home#/roles)。

1. 选择您创建的执行角色。

1. 选择**删除**。

1. 在文本输入字段中输入角色名称，然后选择**删除**。

**删除 DynamoDB 表**

1. 打开 DynamoDB 控制台中 [Tables page](https://console.aws.amazon.com//dynamodb/home#tables:)（表页面）。

1. 选择您创建的表。

1. 选择 **Delete**。

1. 在文本框中输入 **delete**。

1. 选择 **删除表**。