

# 使用 Lambda 处理 Amazon DocumentDB 事件
<a name="with-documentdb"></a>

您可以通过将 Amazon DocumentDB 集群配置为事件源，从而使用 Lambda 函数来处理 [Amazon DocumentDB（与 MongoDB 兼容）更改流](https://docs.aws.amazon.com/documentdb/latest/developerguide/change_streams.html)中的事件。然后，您可以在 Amazon DocumentDB 集群每次发生数据更改时调用 Lambda 函数，从而实现事件驱动型工作负载的自动化。

**注意**  
Lambda 仅支持 Amazon DocumentDB 4.0 和 5.0 版本。Lambda 不支持 3.6 版本。  
此外，对于事件源映射，Lambda 仅支持基于实例的集群和区域性集群。Lambda 不支持 [弹性集群](https://docs.aws.amazon.com/documentdb/latest/developerguide/docdb-using-elastic-clusters.html) 或 [全球集群](https://docs.aws.amazon.com/documentdb/latest/developerguide/global-clusters.html)。当使用 Lambda 作为客户端连接到 Amazon DocumentDB 时，此限制不适用。Lambda 可以连接到所有集群类型来执行 CRUD 操作。

Lambda 按照事件到达的顺序依次处理 Amazon DocumentDB 更改流中的事件。因此，您的函数一次只能处理一条来自 Amazon DocumentDB 的并发调用。要监控函数，您可以跟踪其[并发指标](https://docs.aws.amazon.com/lambda/latest/dg/monitoring-concurrency.html)。

**警告**  
Lambda 事件源映射至少处理每个事件一次，有可能出现重复处理记录的情况。为避免与重复事件相关的潜在问题，我们强烈建议您将函数代码设为幂等性。要了解更多信息，请参阅 AWS 知识中心的[如何让我的 Lambda 函数保持幂等性](https://repost.aws/knowledge-center/lambda-function-idempotent)。

**Topics**
+ [Amazon DocumentDB 事件示例](#docdb-sample-event)
+ [先决条件和权限](#docdb-prereqs)
+ [配置网络安全](#docdb-network)
+ [创建 Amazon DocumentDB 事件源映射（控制台）](#docdb-configuration)
+ [创建 Amazon DocumentDB 事件源映射（SDK 或 CLI）](#docdb-api)
+ [轮询和流的起始位置](#docdb-stream-polling)
+ [监控 Amazon DocumentDB 事件源](#docdb-monitoring)
+ [教程：将 AWS Lambda 与 Amazon DocumentDB 流结合使用](with-documentdb-tutorial.md)

## Amazon DocumentDB 事件示例
<a name="docdb-sample-event"></a>

```
{
    "eventSourceArn": "arn:aws:rds:us-east-1:123456789012:cluster:canaryclusterb2a659a2-qo5tcmqkcl03",
    "events": [
        {
            "event": {
                "_id": {
                    "_data": "0163eeb6e7000000090100000009000041e1"
                },
                "clusterTime": {
                    "$timestamp": {
                        "t": 1676588775,
                        "i": 9
                    }
                },
                "documentKey": {
                    "_id": {
                        "$oid": "63eeb6e7d418cd98afb1c1d7"
                    }
                },
                "fullDocument": {
                    "_id": {
                        "$oid": "63eeb6e7d418cd98afb1c1d7"
                    },
                    "anyField": "sampleValue"
                },
                "ns": {
                    "db": "test_database",
                    "coll": "test_collection"
                },
                "operationType": "insert"
            }
        }
    ],
    "eventSource": "aws:docdb"
}
```

有关此示例中的事件及其形状的更多信息，请参阅 MongoDB 文档网站上的[更改事件](https://www.mongodb.com/docs/manual/reference/change-events/)。

## 先决条件和权限
<a name="docdb-prereqs"></a>

将 Amazon DocumentDB 用作 Lambda 函数的事件源之前，请注意以下先决条件。您必须：
+ **有一个现有的 Amazon DocumentDB 集群并且该集群必须与函数位于同一 AWS 账户和 AWS 区域中。**如果您没有现有的集群，可以按照《Amazon DocumentDB 开发者**指南》中的 [Amazon DocumentDB 入门](https://docs.aws.amazon.com/documentdb/latest/developerguide/get-started-guide.html)创建一个。此外，[教程：将 AWS Lambda 与 Amazon DocumentDB 流结合使用](with-documentdb-tutorial.md) 中的第一组步骤将指导您创建满足所有必要先决条件的 Amazon DocumentDB 集群。
+ **允许 Lambda 访问与 Amazon DocumentDB 集群关联的 Amazon Virtual Private Cloud（Amazon VPC）资源的权限。**有关更多信息，请参阅 [配置网络安全](#docdb-network)。
+ **在 Amazon DocumentDB 集群上启用 TLS。**这是默认设置。如果您禁用了 TLS，Lambda 将无法与您的集群通信。
+ **在 Amazon DocumentDB 集群上激活更改流。**有关更多信息，请参阅《Amazon DocumentDB 开发人员指南**》中的[将更改流与 Amazon DocumentDB 搭配使用](https://docs.aws.amazon.com/documentdb/latest/developerguide/change_streams.html)。
+ **为 Lambda 提供访问 Amazon DocumentDB 集群的凭证。**在设置事件源时，请提供包含访问集群所需的身份验证详细信息（用户名和密码）的 [AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) 密钥。要在设置过程中提供此密钥，请执行以下任一操作：
  + 如果您使用 Lambda 控制台进行设置，请在 **Secrets Manager 密钥**字段中提供密钥。
  + 如果您使用 AWS Command Line Interface（AWS CLI）进行设置，请在 `source-access-configurations` 选项中提供此密钥。您可以将此选项包含在 [https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) 命令或 [https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/update-event-source-mapping.html](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/update-event-source-mapping.html) 命令中。例如：

    ```
    aws lambda create-event-source-mapping \
        ...
        --source-access-configurations  '[{"Type":"BASIC_AUTH","URI":"arn:aws:secretsmanager:us-west-2:123456789012:secret:DocDBSecret-AbC4E6"}]' \
        ...
    ```
+ **向 Lambda 授予管理与 Amazon DocumentDB 流相关的资源的权限。**将以下角色权限手动添加到您的函数的 [执行角色](lambda-intro-execution-role.md)：
  + [rds:DescribeDBClusters](https://docs.aws.amazon.com/AmazonRDS/latest/APIReference/API_DescribeDBClusters.html)
  + [rds:DescribeDBClusterParameters](https://docs.aws.amazon.com/AmazonRDS/latest/APIReference/API_DescribeDBClusterParameters.html)
  + [rds:DescribeDBSubnetGroups](https://docs.aws.amazon.com/AmazonRDS/latest/APIReference/API_DescribeDBSubnetGroups.html)
  + [ec2:CreateNetworkInterface](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_CreateNetworkInterface.html)
  + [ec2:DescribeNetworkInterfaces](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeNetworkInterfaces.html)
  + [ec2:DescribeVpcs](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeVpcs.html)
  + [ec2:DeleteNetworkInterface](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DeleteNetworkInterface.html)
  + [ec2:DescribeSubnets](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSubnets.html)
  + [ec2:DescribeSecurityGroups](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSecurityGroups.html)
  + [kms:Decrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html)
  + [secretsmanager:GetSecretValue](https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html)
+ **将您发送到 Lambda 的 Amazon DocumentDB 更改流事件的大小保持在 6MB 以下。**Lambda 支持最大 6MB 的负载大小。如果更改流尝试向 Lambda 发送大于 6MB 的事件，Lambda 会丢弃该消息并发送 `OversizedRecordCount` 指标。Lambda 将尽力发送所有指标。

**注意**  
尽管 Lambda 函数的最大超时限制通常为 15 分钟，但 Amazon MSK、自行管理的 Apache Kafka、Amazon DocumentDB、Amazon MQ for ActiveMQ 和 RabbitMQ 的事件源映射，仅支持最大超时限制为 14 分钟的函数。此约束可确保事件源映射可以正确处理函数错误和重试。

## 配置网络安全
<a name="docdb-network"></a>

要通过事件源映射向 Lambda 提供对 Amazon DocumentDB 的完全访问权限，集群必须使用公有端点（公有 IP 地址），或者您必须提供对您在其中创建了集群的 Amazon VPC 的访问权限。

将 Amazon DocumentDB 与 Lambda 结合使用时，创建 [AWS PrivateLink VPC 端点](https://docs.aws.amazon.com/vpc/latest/privatelink/create-interface-endpoint.html)，以便您的函数可以访问 Amazon VPC 中的资源。

**注意**  
对于使用事件轮询器的默认（按需）模式的事件源映射函数，需要 AWS PrivateLink VPC 端点。如果事件源映射使用[预调配模式](invocation-eventsourcemapping.md#invocation-eventsourcemapping-provisioned-mode)，则无需配置 AWS PrivateLink VPC 端点。

创建端点以提供对以下资源的访问权限：
+  Lambda：为 Lambda 服务主体创建端点。
+  AWS STS：为 AWS STS 创建端点，以便服务主体代您代入角色。
+  Secrets Manager：如果集群使用 Secrets Manager 来存储凭证，请为 Secrets Manager 创建端点。

也可以在 Amazon VPC 中的每个公有子网上配置 NAT 网关。有关更多信息，请参阅 [为连接到 VPC 的 Lambda 函数启用互联网访问权限](configuration-vpc-internet.md)。

为 Amazon DocumentDB 创建事件源映射时，Lambda 会检查为 Amazon VPC 配置的子网和安全组是否已经存在弹性网络接口（ENI）。如果 Lambda 发现现有 ENI，则会尝试重用这些 ENI。否则，Lambda 会创建新的 ENI 来连接到事件源并调用函数。

**注意**  
Lambda 函数始终在 Lambda 服务拥有的 Amazon VPC 中运行。函数的 VPC 配置不会影响事件源映射。只有事件源的网络配置才能决定 Lambda 连接到事件源的方式。

为包含集群的 Amazon VPC 配置安全组。默认情况下，Amazon DocumentDB 使用以下端口：`27017`。
+ 入站规则：允许与事件源关联安全组的默认代理端口的所有流量。或者，您可以使用自引用安全组规则允许来自同一安全组内的实例进行访问。
+ 出站规则 – 如果您的函数需要与 AWS 服务进行通信，则允许端口 `443` 上的所有流量向外部目标传输。或者，如果您不需要与其他 AWS 服务通信，也可以使用自引用的安全组规则来限制对代理的访问权限。
+ Amazon VPC 端点入站规则：如果您正在使用 Amazon VPC 端点，则与 Amazon VPC 端点关联的安全组，必须允许来自集群安全组的端口 `443` 上的入站流量。

如果集群使用身份验证，您还可以限制 Secrets Manager 端点的端点策略。要调用 Secrets Manager API，Lambda 会使用函数角色而非 Lambda 服务主体。

**Example VPC 端点策略：Secrets Manager 端点**  

```
{
      "Statement": [
          {
              "Action": "secretsmanager:GetSecretValue",
              "Effect": "Allow",
              "Principal": {
                  "AWS": [
                      "arn:aws::iam::123456789012:role/my-role"
                  ]
              },
              "Resource": "arn:aws::secretsmanager:us-west-2:123456789012:secret:my-secret"
          }
      ]
  }
```

当您使用 Amazon VPC 端点时，AWS 会使用端点的弹性网络接口（ENI）路由 API 调用来调用函数。Lambda 服务主体需要针对使用这些 ENI 的任何角色和函数调用 `lambda:InvokeFunction`。

默认情况下，Amazon VPC 端点具有开放的 IAM 策略，允许对资源进行广泛访问。最佳实践是，将这些策略限制为使用该端点执行所需的操作。为确保事件源映射能够调用 Lambda 函数，VPC 端点策略必须允许 Lambda 服务主体调用 `sts:AssumeRole` 和 `lambda:InvokeFunction`。将 VPC 端点策略限制为仅允许来自组织内部的 API 调用，会导致事件源映射无法正常运行，因此这些策略中需要 `"Resource": "*"`。

以下 VPC 端点策略示例展示了如何向 AWS STS 的 Lambda 服务主体和 Lambda 端点授予所需的访问权限。

**Example VPC 端点策略 – AWS STS 端点**  

```
{
      "Statement": [
          {
              "Action": "sts:AssumeRole",
              "Effect": "Allow",
              "Principal": {
                  "Service": [
                      "lambda.amazonaws.com"
                  ]
              },
              "Resource": "*"
          }
      ]
    }
```

**Example VPC 端点策略 – Lambda 端点**  

```
{
      "Statement": [
          {
              "Action": "lambda:InvokeFunction",
              "Effect": "Allow",
              "Principal": {
                  "Service": [
                      "lambda.amazonaws.com"
                  ]
              },
              "Resource": "*"
          }
      ]
  }
```

## 创建 Amazon DocumentDB 事件源映射（控制台）
<a name="docdb-configuration"></a>

要配置 Lambda 函数以从 Amazon DocumentDB 集群更改流中读取数据，请创建一个 [事件源映射](invocation-eventsourcemapping.md)。这一部分介绍了如何从 Lambda 控制台执行此操作。有关 AWS SDK 和 AWS CLI 说明，请参阅 [创建 Amazon DocumentDB 事件源映射（SDK 或 CLI）](#docdb-api)。

**要创建 Amazon DocumentDB 事件源映射（控制台）**

1. 打开 Lamba 控制台的[函数](https://console.aws.amazon.com/lambda/home#/functions)页面。

1. 选择一个函数的名称。

1. 在 **Function overview**（函数概览）下，选择 **Add trigger**（添加触发器）。

1. 在**触发器配置**下的下拉列表中，选择 **DocumentDB**。

1. 配置必填选项，然后选择 **Add**（添加）。

Lambda 支持 Amazon DocumentDB 事件源的以下选项：
+ **DocumentDB 集群** – 选择一个 Amazon DocumentDB 集群。
+ **激活触发器** – 选择是否要立即激活触发器。如果选中此复选框，则在创建事件源映射后，您的函数会立即开始接收来自指定 Amazon DocumentDB 更改流的流量。我们建议取消选中此复选框，以便在禁用状态下创建事件源映射以进行测试。创建事件源映射后，您可以随时将其激活。
+ **数据库名称** – 输入集群中要使用的数据库的名称。
+ （可选）**集合名称** – 输入数据库中要使用的集合的名称。如果您未指定集合，Lambda 将侦听来自数据库中每个集合的所有事件。
+ **批处理大小** – 设置要在单个批处理中检索的最大消息数，最高为 1 万。默认批处理大小为 100。
+ **开始位置** – 选择将从流中开始读取记录的位置。
  + **最新** – 仅处理添加到流中的新记录。您的函数仅在 Lambda 完成创建事件源后才会开始处理记录。这意味着在成功创建事件源之前，某些记录可能会被丢弃。
  + **最早**：处理流中的所有记录。Lambda 使用集群的日志保留期来确定从哪里开始读取事件。具体而言，Lambda 将从 `current_time - log_retention_duration` 开始读取事件。更改流必须在此时间戳之前已经处于活动状态，才能确保 Lambda 正确读取所有事件。
  + **在时间戳处**：处理从特定时间开始的记录。更改流必须在指定的时间戳之前已经处于活动状态，才能确保 Lambda 正确读取所有事件。
+ **身份验证** – 选择访问集群中的代理时将使用的身份验证方法。
  + **BASIC\$1AUTH** – 使用基本身份验证时，您必须提供包含访问集群凭据所需凭证的 Secrets Manager 密钥。
+ **Secrets Manager 密钥** – 选择包含访问 Amazon DocumentDB 集群所需身份验证详细信息（用户名和密码）的 Secrets Manager 密钥。
+ （可选）**批处理时长** – 在调用函数之前收集记录的最长时间（以秒为单位，最大为 300）。
+ （可选）**完整文档配置** – 对于文档更新操作，请选择要发送到流中的内容。默认值为 `Default`，这意味着对于每个更改流事件，Amazon DocumentDB 仅发送一个描述所发生更改的增量。有关此字段的更多信息，请参阅 MongoDB Javadocs API 文档中的 [FullDocument](https://mongodb.github.io/mongo-java-driver/3.9/javadoc/com/mongodb/client/model/changestream/FullDocument.html#DEFAULT)。
  + **默认** – Lambda 将仅发送描述所发生更改的部分文档。
  + **UpdateLookup** – Lambda 将发送一个描述所发生更改的增量以及完整文档的副本。

## 创建 Amazon DocumentDB 事件源映射（SDK 或 CLI）
<a name="docdb-api"></a>

要使用 [AWS SDK](https://aws.amazon.com/developer/tools/) 来管理 Amazon DocumentDB 事件源映射，您可以使用以下 API 操作：
+ [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html)
+ [ListEventSourceMappings](https://docs.aws.amazon.com/lambda/latest/api/API_ListEventSourceMappings.html)
+ [GetEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_GetEventSourceMapping.html)
+ [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html)
+ [DeleteEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_DeleteEventSourceMapping.html)

要使用 AWS CLI 创建事件源映射，请使用 [https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) 命令。以下示例使用此命令将名为 `my-function` 的函数映射到一个 Amazon DocumentDB 更改流。事件源由 Amazon 资源名称（ARN）指定，批处理大小为 500，从以 Unix 时间表示的时间戳开始。该命令还指定了 Lambda 用来连接到 Amazon DocumentDB 的 Secrets Manager 密钥。此外，它还包含用于指定要从中读取数据的数据库和集合的 `document-db-event-source-config` 参数。

```
aws lambda create-event-source-mapping --function-name my-function \
    --event-source-arn arn:aws:rds:us-west-2:123456789012:cluster:privatecluster7de2-epzcyvu4pjoy
    --batch-size 500 \
    --starting-position AT_TIMESTAMP \
    --starting-position-timestamp 1541139109 \
    --source-access-configurations '[{"Type":"BASIC_AUTH","URI":"arn:aws:secretsmanager:us-east-1:123456789012:secret:DocDBSecret-BAtjxi"}]' \
    --document-db-event-source-config '{"DatabaseName":"test_database", "CollectionName": "test_collection"}' \
```

应看到类似如下内容的输出：

```
{
    "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284",
    "BatchSize": 500,
    "DocumentDBEventSourceConfig": {
        "CollectionName": "test_collection",
        "DatabaseName": "test_database",
        "FullDocument": "Default"
    },
    "MaximumBatchingWindowInSeconds": 0,
    "EventSourceArn": "arn:aws:rds:us-west-2:123456789012:cluster:privatecluster7de2-epzcyvu4pjoy",
    "FunctionArn": "arn:aws:lambda:us-west-2:123456789012:function:my-function",
    "LastModified": 1541348195.412,
    "LastProcessingResult": "No records processed",
    "State": "Creating",
    "StateTransitionReason": "User action"
}
```

创建后，您可以使用 [https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/update-event-source-mapping.html](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/update-event-source-mapping.html) 命令更新 Amazon DocumentDB 事件源的设置。以下示例会将批处理大小更新为 1000，并将批处理时长更新为 10 秒。对于此命令，您需要有事件源映射的 UUID，您可以通过 `list-event-source-mapping` 命令或 Lambda 控制台检索该值。

```
aws lambda update-event-source-mapping --function-name my-function \
    --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
    --batch-size 1000 \
    --batch-window 10
```

您应看到如下输出：

```
{
    "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284",
    "BatchSize": 500,
    "DocumentDBEventSourceConfig": {
        "CollectionName": "test_collection",
        "DatabaseName": "test_database",
        "FullDocument": "Default"
    },
    "MaximumBatchingWindowInSeconds": 0,
    "EventSourceArn": "arn:aws:rds:us-west-2:123456789012:cluster:privatecluster7de2-epzcyvu4pjoy",
    "FunctionArn": "arn:aws:lambda:us-west-2:123456789012:function:my-function",
    "LastModified": 1541359182.919,
    "LastProcessingResult": "OK",
    "State": "Updating",
    "StateTransitionReason": "User action"
}
```

Lambda 会异步更新设置，因此在更新完成之前，您可能无法在输出中看到这些更改。要查看事件源映射的当前设置，请使用 [https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/get-event-source-mapping.html](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/get-event-source-mapping.html) 命令。

```
aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b
```

您应看到如下输出：

```
{
    "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284",
    "DocumentDBEventSourceConfig": {
        "CollectionName": "test_collection",
        "DatabaseName": "test_database",
        "FullDocument": "Default"
    },
    "BatchSize": 1000,
    "MaximumBatchingWindowInSeconds": 10,
    "EventSourceArn": "arn:aws:rds:us-west-2:123456789012:cluster:privatecluster7de2-epzcyvu4pjoy",
    "FunctionArn": "arn:aws:lambda:us-west-2:123456789012:function:my-function",
    "LastModified": 1541359182.919,
    "LastProcessingResult": "OK",
    "State": "Enabled",
    "StateTransitionReason": "User action"
}
```

要删除 Amazon DocumentDB 事件源映射，请使用 [https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/delete-event-source-mapping.html](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/delete-event-source-mapping.html) 命令：

```
aws lambda delete-event-source-mapping \
    --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284
```

## 轮询和流的起始位置
<a name="docdb-stream-polling"></a>

请注意，事件源映射创建和更新期间的流轮询最终是一致的。
+ 在事件源映射创建期间，可能需要几分钟才能开始轮询来自流的事件。
+ 在事件源映射更新期间，可能需要几分钟才能停止和重新开始轮询来自流的事件。

此行为意味着，如果你指定 `LATEST` 作为流的起始位置，事件源映射可能会在创建或更新期间错过事件。为确保不会错过任何事件，请将流的起始位置指定为 `TRIM_HORIZON` 或 `AT_TIMESTAMP`。

## 监控 Amazon DocumentDB 事件源
<a name="docdb-monitoring"></a>

为便于您监控 Amazon DocumentDB 事件源，在函数处理完一批记录后，Lambda 将发送 `IteratorAge` 指标。*迭代器期限*是最近事件的时间戳和当前时间戳之间的差值。基本上，`IteratorAge` 指标表示自处理该批处理中最后一条记录后已经过的时间。如果函数当前正在处理新事件，则可使用迭代器期限来估算新记录的添加时间与函数处理该记录的时间之间的延迟。如果 `IteratorAge` 呈上升趋势，则可能说明您的函数存在问题。有关更多信息，请参阅 [将 CloudWatch 指标与 Lambda 结合使用](monitoring-metrics.md)。

Amazon DocumentDB 更改流未经过优化，无法处理事件之间的较大时间间隔。如果 Amazon DocumentDB 事件源在很长一段时间内都没有收到任何事件，那么 Lambda 可能会禁用事件源映射。此时段的长度可能从几周到几个月不等，具体取决于集群规模和其他工作负载。

Lambda 支持最大 6MB 的负载。但是，Amazon DocumentDB 更改流事件的大小可达 16MB。如果更改流尝试向 Lambda 发送大于 6MB 的更改流事件，Lambda 会丢弃该消息并发送 `OversizedRecordCount` 指标。Lambda 将尽力发送所有指标。

# 教程：将 AWS Lambda 与 Amazon DocumentDB 流结合使用
<a name="with-documentdb-tutorial"></a>

 在本教程中，您将创建一个基本的 Lambda 函数，该函数会消耗来自 Amazon DocumentDB（与 MongoDB 兼容）更改流的事件。要完成本教程，您需要经历以下阶段：
+ 设置 Amazon DocumentDB 集群，连接到该集群，并在其上激活更改流。
+ 创建 Lambda 函数，并将 Amazon DocumentDB 集群配置为函数的事件源。
+ 通过将项目插入到 Amazon DocumentDB 数据库中来测试设置。

## 创建 Amazon DocumentDB 集群
<a name="docdb-documentdb-cluster"></a>

1. 打开 [Amazon DocumentDB 控制台](https://console.aws.amazon.com/docdb/home#)。在**集群**下，选择**创建**。

1. 使用以下配置创建集群：
   + 对于**集群类型**，选择**基于实例的集群**。这是默认选项。
   + 在**集群配置**下，确保已选择**引擎版本** 5.0.0。这是默认选项。
   + 在**实例配置**下：
     + 对于**数据库实例类**，选择**内存优化类**。这是默认选项。
     + 对于**常规副本实例数**，请选择 1。
     + 对于**实例类**，使用默认选择。
   + 在**身份验证**下，输入主要用户的用户名，然后选择**自行管理**。输入密码，然后确认密码。
   + 保留所有其他默认设置。

1. 选择**创建集群**。

## 在 Secrets Manager 中创建密钥
<a name="docdb-secret-in-secrets-manager"></a>

在 Amazon DocumentDB 创建您的集群时，请创建一个 AWS Secrets Manager 密钥来存储数据库凭证。在稍后的步骤中创建 Lambda 事件源映射时，您将提供此密钥。

**在 Secrets Manager 中创建密钥**

1. 打开 [Secrets Manager](https://console.aws.amazon.com/secretsmanager/home#) 控制台并选择**存储新密钥**。

1. 对于**选择密钥类型**，请选择以下选项之一：
   + 在**基本详细信息**下：
     + **密钥类型**：用于 Amazon DocumentDB 数据库的凭证
     + 在**凭证**下，输入用于创建 Amazon DocumentDB 集群的相同用户名和密码。
     + **数据库**：选择 Amazon DocumentDB 集群。
     + 选择**下一步**。

1. 对于**配置密钥**，请选择以下选项之一：
   + **密钥名称**：`DocumentDBSecret`
   + 选择**下一步**。

1. 选择**下一步**。

1. 选择**存储**。

1. 刷新控制台以验证 `DocumentDBSecret` 密钥是否成功存储。

记下**密钥 ARN**。您将在后面的步骤中用到它。

## 连接到集群
<a name="docdb-connect-to-cluster"></a>

**使用 AWS CloudShell 连接到 Amazon DocumentDB 集群**

1. 在 Amazon DocumentDB 管理控制台上的**集群**下，找到您创建的集群。单击集群旁边的复选框，选择您的集群。

1. 选择**连接到集群**。将出现 CloudShell **运行命令**屏幕。

1. 在**新环境名称**字段中，输入唯一的名称，例如“test”，然后选择**创建并运行**。

1. 出现提示时请输入密码。当提示符变成 `rs0 [direct: primary] <env-name>>` 时，您成功连接到您的 Amazon DocumentDB 集群。

## 激活更改流
<a name="docdb-activate-change-streams"></a>

在本教程中，您将跟踪对 Amazon DocumentDB 集群中 `docdbdemo` 数据库 `products` 集合的更改。您可以通过激活[更改流](https://docs.aws.amazon.com/documentdb/latest/developerguide/change_streams.html)来完成此操作。

**在集群内创建新数据库**

1. 运行以下命令，创建名为 `docdbdemo` 的新数据库：

   ```
   use docdbdemo
   ```

1. 在终端窗口中，使用以下命令将记录插入到 `docdbdemo` 中：

   ```
   db.products.insertOne({"hello":"world"})
   ```

   您应看到类似如下的输出：

   ```
   {
     acknowledged: true,
     insertedId: ObjectId('67f85066ca526410fd531d59')
   }
   ```

1. 接下来，使用以下命令激活 `docdbdemo` 数据库 `products` 集合上的更改流：

   ```
   db.adminCommand({modifyChangeStreams: 1,
       database: "docdbdemo",
       collection: "products", 
       enable: true});
   ```

    应看到类似如下内容的输出：

   ```
   { "ok" : 1, "operationTime" : Timestamp(1680126165, 1) }
   ```

## 创建接口 VPC 端点
<a name="docdb-create-interface-vpc-endpoints"></a>

接下来，创建[接口 VPC 端点](https://docs.aws.amazon.com/vpc/latest/privatelink/create-interface-endpoint.html#create-interface-endpoint-aws)，以确保 Lambda 和 Secrets Manager（稍后用于存储集群访问凭证）能够连接到默认 VPC。

**创建接口 VPC 端点**

1. 打开 [VPC 控制台](https://console.aws.amazon.com/vpc/home#)。在左侧菜单的**虚拟私有云**下，选择**端点**。

1. 选择**创建端点**。使用以下配置创建端点：
   + 对于**名称标签**，输入 `lambda-default-vpc`。
   + 对于**服务类别**，选择 AWS 服务。
   + 对于**服务**，在搜索框中输入 `lambda`。选择格式为 `com.amazonaws.<region>.lambda` 的服务。
   + 对于 **VPC**，选择您的 Amazon DocumentDB 集群所在的 VPC。这通常是[默认 VPC](https://docs.aws.amazon.com/vpc/latest/userguide/default-vpc.html)。
   + 对于**子网**，选中每个可用区旁边的复选框。为每个可用区选择正确的子网 ID。
   + 对于 **IP 地址类型**，选择 IPv4。
   + 对于**安全组**，选择 Amazon DocumentDB 集群使用的安全组。这通常是 `default` 安全组。
   + 保留所有其他默认设置。
   + 选择**创建端点**。

1. 再次选择**创建端点**。使用以下配置创建端点：
   + 对于**名称标签**，输入 `secretsmanager-default-vpc`。
   + 对于**服务类别**，选择 AWS 服务。
   + 对于**服务**，在搜索框中输入 `secretsmanager`。选择格式为 `com.amazonaws.<region>.secretsmanager` 的服务。
   + 对于 **VPC**，选择您的 Amazon DocumentDB 集群所在的 VPC。这通常是[默认 VPC](https://docs.aws.amazon.com/vpc/latest/userguide/default-vpc.html)。
   + 对于**子网**，选中每个可用区旁边的复选框。为每个可用区选择正确的子网 ID。
   + 对于 **IP 地址类型**，选择 IPv4。
   + 对于**安全组**，选择 Amazon DocumentDB 集群使用的安全组。这通常是 `default` 安全组。
   + 保留所有其他默认设置。
   + 选择**创建端点**。

 本教程的集群设置部分到此完成。

## 创建执行角色
<a name="docdb-create-the-execution-role"></a>

 在接下来的一组步骤中，您将创建 Lambda 函数。首先，您需要创建执行角色，以向函数授予访问集群的权限。为此，您可以先创建 IAM policy，然后将此策略附加到 IAM 角色。

**创建 IAM policy**

1. 在 IAM 控制台中打开[策略页面](https://console.aws.amazon.com/iam/home#/policies)，然后选择**创建策略**。

1. 选择 **JSON** 选项卡。在以下策略中，将语句最后一行中的 Secrets Manager 资源 ARN 替换为之前的密钥 ARN，然后将策略复制到编辑器中。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "LambdaESMNetworkingAccess",
               "Effect": "Allow",
               "Action": [
                   "ec2:CreateNetworkInterface",
                   "ec2:DescribeNetworkInterfaces",
                   "ec2:DescribeVpcs",
                   "ec2:DeleteNetworkInterface",
                   "ec2:DescribeSubnets",
                   "ec2:DescribeSecurityGroups",
                   "kms:Decrypt"
               ],
               "Resource": "*"
           },
           {
               "Sid": "LambdaDocDBESMAccess",
               "Effect": "Allow",
               "Action": [
                   "rds:DescribeDBClusters",
                   "rds:DescribeDBClusterParameters",
                   "rds:DescribeDBSubnetGroups"
               ],
               "Resource": "*"
           },
           {
               "Sid": "LambdaDocDBESMGetSecretValueAccess",
               "Effect": "Allow",
               "Action": [
                   "secretsmanager:GetSecretValue"
               ],
               "Resource": "arn:aws:secretsmanager:us-east-1:123456789012:secret:DocumentDBSecret"
           }
       ]
   }
   ```

------

1. 选择**下一步：标签**，然后选择**下一步：审核**。

1. 对于 **Name (名称)**，请输入 `AWSDocumentDBLambdaPolicy`。

1. 选择**创建策略**。

**创建 IAM 角色**

1. 在 IAM 控制台中打开[角色页面](https://console.aws.amazon.com/iam/home#/roles)，然后选择**创建角色**。

1. 对于**选择可信实体**，请选择以下选项之一：
   + **可信实体类型**：AWS 服务
   + **服务或使用案例**：Lambda
   + 选择**下一步**。

1. 对于**添加权限**，选择刚刚创建的 `AWSDocumentDBLambdaPolicy` 策略以及 `AWSLambdaBasicExecutionRole`，以向函数授予写入 Amazon CloudWatch Logs 的权限。

1. 选择**下一步**。

1. 对于 **Role name（角色名称）**，输入 `AWSDocumentDBLambdaExecutionRole`。

1. 请选择**创建角色**。

## 创建 Lambda 函数
<a name="docdb-create-the-lambda-function"></a>

本教程使用 Python 3.14 运行时系统，但我们还提供了适用于其他运行时系统的示例代码文件。您可以选择以下框中的选项卡，查看适用于您感兴趣的运行时系统的代码。

代码接收 Amazon DocumentDB 事件输入并对其所包含的消息进行处理。

**创建 Lambda 函数**

1. 打开 Lamba 控制台的 [Functions page](https://console.aws.amazon.com/lambda/home#/functions)（函数页面）。

1. 选择 **Create function**（创建函数）。

1. 选择**从头开始编写**。

1. 在**基本信息**中，执行以下操作：

   1. 对于**函数名称**，输入 `ProcessDocumentDBRecords`。

   1. 对于**运行时系统**，选择 **Python 3.14**。

   1. 对于**架构**，选择 **x86\$164**。

1. 在**更改默认执行角色**选项卡中，执行以下操作：

   1. 展开选项卡，然后选择**使用现有角色**。

   1. 选择您之前创建的 `AWSDocumentDBLambdaExecutionRole`。

1. 选择**创建函数**。

**要部署函数代码**

1. 在下框中选择 **Python** 选项卡并复制代码。

------
#### [ .NET ]

**适用于 .NET 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-docdb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 .NET 将 Amazon DocumentDB 事件与 Lambda 结合使用。  

   ```
   using Amazon.Lambda.Core;
   using System.Text.Json;
   using System;
   using System.Collections.Generic;
   using System.Text.Json.Serialization;
   //Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
   [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
   
   namespace LambdaDocDb;
   
   public class Function
   {
       
        /// <summary>
       /// Lambda function entry point to process Amazon DocumentDB events.
       /// </summary>
       /// <param name="event">The Amazon DocumentDB event.</param>
       /// <param name="context">The Lambda context object.</param>
       /// <returns>A string to indicate successful processing.</returns>
       public string FunctionHandler(Event evnt, ILambdaContext context)
       {
           
           foreach (var record in evnt.Events)
           {
               ProcessDocumentDBEvent(record, context);
           }
   
           return "OK";
       }
   
        private void ProcessDocumentDBEvent(DocumentDBEventRecord record, ILambdaContext context)
       {
           
           var eventData = record.Event;
           var operationType = eventData.OperationType;
           var databaseName = eventData.Ns.Db;
           var collectionName = eventData.Ns.Coll;
           var fullDocument = JsonSerializer.Serialize(eventData.FullDocument, new JsonSerializerOptions { WriteIndented = true });
   
           context.Logger.LogLine($"Operation type: {operationType}");
           context.Logger.LogLine($"Database: {databaseName}");
           context.Logger.LogLine($"Collection: {collectionName}");
           context.Logger.LogLine($"Full document:\n{fullDocument}");
       }
   
   
   
       public class Event
       {
           [JsonPropertyName("eventSourceArn")]
           public string EventSourceArn { get; set; }
   
           [JsonPropertyName("events")]
           public List<DocumentDBEventRecord> Events { get; set; }
   
           [JsonPropertyName("eventSource")]
           public string EventSource { get; set; }
       }
   
       public class DocumentDBEventRecord
       {
           [JsonPropertyName("event")]
           public EventData Event { get; set; }
       }
   
       public class EventData
       {
           [JsonPropertyName("_id")]
           public IdData Id { get; set; }
   
           [JsonPropertyName("clusterTime")]
           public ClusterTime ClusterTime { get; set; }
   
           [JsonPropertyName("documentKey")]
           public DocumentKey DocumentKey { get; set; }
   
           [JsonPropertyName("fullDocument")]
           public Dictionary<string, object> FullDocument { get; set; }
   
           [JsonPropertyName("ns")]
           public Namespace Ns { get; set; }
   
           [JsonPropertyName("operationType")]
           public string OperationType { get; set; }
       }
   
       public class IdData
       {
           [JsonPropertyName("_data")]
           public string Data { get; set; }
       }
   
       public class ClusterTime
       {
           [JsonPropertyName("$timestamp")]
           public Timestamp Timestamp { get; set; }
       }
   
       public class Timestamp
       {
           [JsonPropertyName("t")]
           public long T { get; set; }
   
           [JsonPropertyName("i")]
           public int I { get; set; }
       }
   
       public class DocumentKey
       {
           [JsonPropertyName("_id")]
           public Id Id { get; set; }
       }
   
       public class Id
       {
           [JsonPropertyName("$oid")]
           public string Oid { get; set; }
       }
   
       public class Namespace
       {
           [JsonPropertyName("db")]
           public string Db { get; set; }
   
           [JsonPropertyName("coll")]
           public string Coll { get; set; }
       }
   }
   ```

------
#### [ Go ]

**适用于 Go 的 SDK V2**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-docdb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 Go 将 Amazon DocumentDB 事件与 Lambda 结合使用。  

   ```
   package main
   
   import (
   	"context"
   	"encoding/json"
   	"fmt"
   
   	"github.com/aws/aws-lambda-go/lambda"
   )
   
   type Event struct {
   	Events []Record `json:"events"`
   }
   
   type Record struct {
   	Event struct {
   		OperationType string `json:"operationType"`
   		NS            struct {
   			DB   string `json:"db"`
   			Coll string `json:"coll"`
   		} `json:"ns"`
   		FullDocument interface{} `json:"fullDocument"`
   	} `json:"event"`
   }
   
   func main() {
   	lambda.Start(handler)
   }
   
   func handler(ctx context.Context, event Event) (string, error) {
   	fmt.Println("Loading function")
   	for _, record := range event.Events {
   		logDocumentDBEvent(record)
   	}
   
   	return "OK", nil
   }
   
   func logDocumentDBEvent(record Record) {
   	fmt.Printf("Operation type: %s\n", record.Event.OperationType)
   	fmt.Printf("db: %s\n", record.Event.NS.DB)
   	fmt.Printf("collection: %s\n", record.Event.NS.Coll)
   	docBytes, _ := json.MarshalIndent(record.Event.FullDocument, "", "  ")
   	fmt.Printf("Full document: %s\n", string(docBytes))
   }
   ```

------
#### [ Java ]

**适用于 Java 的 SDK 2.x**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-docdb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 Java 将 Amazon DocumentDB 事件与 Lambda 结合使用。  

   ```
   import java.util.List;
   import java.util.Map;
   
   import com.amazonaws.services.lambda.runtime.Context;
   import com.amazonaws.services.lambda.runtime.RequestHandler;
   
   public class Example implements RequestHandler<Map<String, Object>, String> {
   
       @SuppressWarnings("unchecked")
       @Override
       public String handleRequest(Map<String, Object> event, Context context) {
           List<Map<String, Object>> events = (List<Map<String, Object>>) event.get("events");
           for (Map<String, Object> record : events) {
               Map<String, Object> eventData = (Map<String, Object>) record.get("event");
               processEventData(eventData);
           }
   
           return "OK";
       }
   
       @SuppressWarnings("unchecked")
       private void processEventData(Map<String, Object> eventData) {
           String operationType = (String) eventData.get("operationType");
           System.out.println("operationType: %s".formatted(operationType));
   
           Map<String, Object> ns = (Map<String, Object>) eventData.get("ns");
   
           String db = (String) ns.get("db");
           System.out.println("db: %s".formatted(db));
           String coll = (String) ns.get("coll");
           System.out.println("coll: %s".formatted(coll));
   
           Map<String, Object> fullDocument = (Map<String, Object>) eventData.get("fullDocument");
           System.out.println("fullDocument: %s".formatted(fullDocument));
       }
   
   }
   ```

------
#### [ JavaScript ]

**SDK for JavaScript (v3)**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-docdb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 JavaScript 将 Amazon DocumentDB 事件与 Lambda 结合使用。  

   ```
   console.log('Loading function');
   exports.handler = async (event, context) => {
       event.events.forEach(record => {
           logDocumentDBEvent(record);
       });
       return 'OK';
   };
   
   const logDocumentDBEvent = (record) => {
       console.log('Operation type: ' + record.event.operationType);
       console.log('db: ' + record.event.ns.db);
       console.log('collection: ' + record.event.ns.coll);
       console.log('Full document:', JSON.stringify(record.event.fullDocument, null, 2));
   };
   ```
使用 TypeScript 将 Amazon DocumentDB 事件与 Lambda 结合使用  

   ```
   import { DocumentDBEventRecord, DocumentDBEventSubscriptionContext } from 'aws-lambda';
   
   console.log('Loading function');
   
   export const handler = async (
     event: DocumentDBEventSubscriptionContext,
     context: any
   ): Promise<string> => {
     event.events.forEach((record: DocumentDBEventRecord) => {
       logDocumentDBEvent(record);
     });
     return 'OK';
   };
   
   const logDocumentDBEvent = (record: DocumentDBEventRecord): void => {
     console.log('Operation type: ' + record.event.operationType);
     console.log('db: ' + record.event.ns.db);
     console.log('collection: ' + record.event.ns.coll);
     console.log('Full document:', JSON.stringify(record.event.fullDocument, null, 2));
   };
   ```

------
#### [ PHP ]

**适用于 PHP 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-docdb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 PHP 将 Amazon DocumentDB 事件与 Lambda 结合使用。  

   ```
   <?php
   
   require __DIR__.'/vendor/autoload.php';
   
   use Bref\Context\Context;
   use Bref\Event\Handler;
   
   class DocumentDBEventHandler implements Handler
   {
       public function handle($event, Context $context): string
       {
   
           $events = $event['events'] ?? [];
           foreach ($events as $record) {
               $this->logDocumentDBEvent($record['event']);
           }
           return 'OK';
       }
   
       private function logDocumentDBEvent($event): void
       {
           // Extract information from the event record
   
           $operationType = $event['operationType'] ?? 'Unknown';
           $db = $event['ns']['db'] ?? 'Unknown';
           $collection = $event['ns']['coll'] ?? 'Unknown';
           $fullDocument = $event['fullDocument'] ?? [];
   
           // Log the event details
   
           echo "Operation type: $operationType\n";
           echo "Database: $db\n";
           echo "Collection: $collection\n";
           echo "Full document: " . json_encode($fullDocument, JSON_PRETTY_PRINT) . "\n";
       }
   }
   return new DocumentDBEventHandler();
   ```

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-docdb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 Python 将 Amazon DocumentDB 事件与 Lambda 结合使用。  

   ```
   import json
   
   def lambda_handler(event, context):
       for record in event.get('events', []):
           log_document_db_event(record)
       return 'OK'
   
   def log_document_db_event(record):
       event_data = record.get('event', {})
       operation_type = event_data.get('operationType', 'Unknown')
       db = event_data.get('ns', {}).get('db', 'Unknown')
       collection = event_data.get('ns', {}).get('coll', 'Unknown')
       full_document = event_data.get('fullDocument', {})
   
       print(f"Operation type: {operation_type}")
       print(f"db: {db}")
       print(f"collection: {collection}")
       print("Full document:", json.dumps(full_document, indent=2))
   ```

------
#### [ Ruby ]

**适用于 Ruby 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-docdb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 Ruby 将 Amazon DocumentDB 事件与 Lambda 结合使用。  

   ```
   require 'json'
   
   def lambda_handler(event:, context:)
     event['events'].each do |record|
       log_document_db_event(record)
     end
     'OK'
   end
   
   def log_document_db_event(record)
     event_data = record['event'] || {}
     operation_type = event_data['operationType'] || 'Unknown'
     db = event_data.dig('ns', 'db') || 'Unknown'
     collection = event_data.dig('ns', 'coll') || 'Unknown'
     full_document = event_data['fullDocument'] || {}
   
     puts "Operation type: #{operation_type}"
     puts "db: #{db}"
     puts "collection: #{collection}"
     puts "Full document: #{JSON.pretty_generate(full_document)}"
   end
   ```

------
#### [ Rust ]

**适用于 Rust 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-docdb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 Rust 将 Amazon DocumentDB 事件与 Lambda 结合使用。  

   ```
   use lambda_runtime::{service_fn, tracing, Error, LambdaEvent};
   use aws_lambda_events::{
       event::documentdb::{DocumentDbEvent, DocumentDbInnerEvent},
      };
   
   
   // Built with the following dependencies:
   //lambda_runtime = "0.11.1"
   //serde_json = "1.0"
   //tokio = { version = "1", features = ["macros"] }
   //tracing = { version = "0.1", features = ["log"] }
   //tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }
   //aws_lambda_events = "0.15.0"
   
   async fn function_handler(event: LambdaEvent<DocumentDbEvent>) ->Result<(), Error> {
       
       tracing::info!("Event Source ARN: {:?}", event.payload.event_source_arn);
       tracing::info!("Event Source: {:?}", event.payload.event_source);
     
       let records = &event.payload.events;
      
       if records.is_empty() {
           tracing::info!("No records found. Exiting.");
           return Ok(());
       }
   
       for record in records{
           log_document_db_event(record);
       }
   
       tracing::info!("Document db records processed");
   
       // Prepare the response
       Ok(())
   
   }
   
   fn log_document_db_event(record: &DocumentDbInnerEvent)-> Result<(), Error>{
       tracing::info!("Change Event: {:?}", record.event);
       
       Ok(())
   
   }
   
   #[tokio::main]
   async fn main() -> Result<(), Error> {
       tracing_subscriber::fmt()
       .with_max_level(tracing::Level::INFO)
       .with_target(false)
       .without_time()
       .init();
   
       let func = service_fn(function_handler);
       lambda_runtime::run(func).await?;
       Ok(())
       
   }
   ```

------

1. 在 Lambda 控制台的**代码源**窗格中，将代码粘贴到代码编辑器中，替换 Lambda 创建的代码。

1. 在**部署**部分，选择**部署**以更新函数的代码：  
![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/getting-started-tutorial/deploy-console.png)

## 创建 Lambda 事件源映射
<a name="docdb-create-the-lambda-event-source-mapping"></a>

 创建事件源映射，将 Amazon DocumentDB 更改流与 Lambda 函数相关联。创建此事件源映射后，AWS Lambda 即开始轮询该流。

**创建事件源映射**

1. 在 Lambda 控制台中打开[函数页面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 选择您之前创建的 `ProcessDocumentDBRecords` 函数。

1. 选择**配置**选项卡，然后从左侧菜单中选择**触发器**。

1. 选择**添加触发器**。

1. 在**触发器配置**下，为源选择 **Amazon DocumentDB**。

1. 使用以下配置创建事件源映射：
   + **Amazon DocumentDB 集群**：选择之前创建的集群。
   + **数据库名称**：docdbdemo
   + **集合名称**：产品
   + **批处理大小**：1
   + **起始位置**：最新
   + **身份验证**：BASIC\$1AUTH
   + **Secrets Manager 密钥**：选择 Amazon DocumentDB 集群的密钥。其名称类似于 `rds!cluster-12345678-a6f0-52c0-b290-db4aga89274f`。
   + **批处理时段**：1
   + **完整文档配置**：UpdateLookup

1. 选择**添加**。创建事件源映射可能需要花费几分钟的时间。

## 测试函数
<a name="docdb-test-insert"></a>

等待事件源映射达到**已启用**状态。这个过程可能需要几分钟。然后，通过插入、更新和删除数据库记录来测试端到端设置。开始前的准备工作：

1. 在 CloudShell 环境中[重新连接到 Amazon DocumentDB 集群](#docdb-connect-to-cluster)。

1. 运行以下命令以确保使用的是 `docdbdemo` 数据库：

   ```
   use docdbdemo
   ```

### 插入记录
<a name="docdb-test-insert"></a>

在 `docdbdemo` 数据库的 `products` 集合中插入记录：

```
db.products.insertOne({"name":"Pencil", "price": 1.00})
```

通过[查看 CloudWatch Logs](monitoring-cloudwatchlogs-view.md#monitoring-cloudwatchlogs-console) 来验证函数是否成功处理该事件。您会看到如下日志条目：

![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/documentdb-insert-log.png)


### 更新记录
<a name="docdb-test-update"></a>

使用以下命令更新您刚刚插入的记录：

```
db.products.updateOne(
    { "name": "Pencil" },
    { $set: { "price": 0.50 }}
)
```

通过[查看 CloudWatch Logs](monitoring-cloudwatchlogs-view.md#monitoring-cloudwatchlogs-console) 来验证函数是否成功处理该事件。您会看到如下日志条目：

![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/documentdb-update-log.png)


### 删除记录
<a name="docdb-test-delete"></a>

使用以下命令删除您刚刚更新的记录：

```
db.products.deleteOne( { "name": "Pencil" } )
```

通过[查看 CloudWatch Logs](monitoring-cloudwatchlogs-view.md#monitoring-cloudwatchlogs-console) 来验证函数是否成功处理该事件。您会看到如下日志条目：

![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/documentdb-delete-log.png)


## 问题排查
<a name="docdb-lambda-troubleshooting"></a>

如果您在函数的 CloudWatch 日志中没有看到任何数据库事件，请检查以下各项：
+ 确保 Lambda 事件源映射（也称为触发器）处于**已启用**状态。创建事件源映射可能需要几分钟时间。
+ 如果事件源映射**已启用**，但您仍然无法在 CloudWatch 中看到数据库事件：
  + 确保事件源映射中的**数据库名称**设置为 `docdbdemo`。  
![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/documentdb-trigger.png)
  + 检查事件源映射**上次处理结果**字段中是否显示以下消息：“问题：连接错误。您的 VPC 必须能够连接到 Lambda 和 STS，以及 Secrets Manager（如果需要身份验证）。” 如果看到此错误，请确保您[已创建 Lambda 和 Secrets Manager VPC 接口端点](#docdb-create-interface-vpc-endpoints)，并且这些端点使用与 Amazon DocumentDB 集群相同的 VPC 和子网。  
![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/documentdb-lastprocessingresult.png)

## 清除资源
<a name="docdb-cleanup"></a>

 除非您想要保留为本教程创建的资源，否则可立即将其删除。通过删除您不再使用的 AWS 资源，可防止您的 AWS 账户 产生不必要的费用。

**删除 Lambda 函数**

1. 打开 Lamba 控制台的 [Functions（函数）页面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 选择您创建的函数。

1. 依次选择**操作**和**删除**。

1. 在文本输入字段中键入 **confirm**，然后选择**删除**。

**删除执行角色**

1. 打开 IAM 控制台的[角色页面](https://console.aws.amazon.com/iam/home#/roles)。

1. 选择您创建的执行角色。

1. 选择**删除**。

1. 在文本输入字段中输入角色名称，然后选择**删除**。

**删除 VPC 端点**

1. 打开 [VPC 控制台](https://console.aws.amazon.com/vpc/home#)。在左侧菜单的**虚拟私有云**下，选择**端点**。

1. 选择您创建的端点。

1. 选择**操作**、**删除 VPC 端点**。

1. 在文本输入字段中输入 **delete**。

1. 选择**删除**。

**删除 Amazon DocumentDB 集群**

1. 打开 [Amazon DocumentDB 控制台](https://console.aws.amazon.com/docdb/home#)。

1. 选择您为本教程创建的 Amazon DocumentDB 集群，并禁用删除保护。

1. 在主**集群**页面中，再次选择 Amazon DocumentDB 集群。

1. 依次选择**操作**和**删除**。

1. 对于**创建最终集群快照**，选择**否**。

1. 在文本输入字段中输入 **delete**。

1. 选择**删除**。

**在 Secrets Manager 中删除密钥**

1. 打开 [Secrets Manager 控制台](https://console.aws.amazon.com/secretsmanager/home#)。

1. 选择您为本教程创建的密钥。

1. 依次选择**操作**、**删除密钥**。

1. 选择**计划删除**。