使用 Lambda 处理 Amazon DocumentDB 事件 - AWS Lambda

使用 Lambda 处理 Amazon DocumentDB 事件

您可以通过将 Amazon DocumentDB 集群配置为事件源,从而使用 Lambda 函数来处理 Amazon DocumentDB(与 MongoDB 兼容)更改流中的事件。然后,您可以在 Amazon DocumentDB 集群每次发生数据更改时调用 Lambda 函数,从而实现事件驱动型工作负载的自动化。

注意

Lambda 仅支持 Amazon DocumentDB 4.0 和 5.0 版本。Lambda 不支持 3.6 版本。

此外,对于事件源映射,Lambda 仅支持基于实例的集群和区域性集群。Lambda 不支持 弹性集群全球集群。当使用 Lambda 作为客户端连接到 Amazon DocumentDB 时,此限制不适用。Lambda 可以连接到所有集群类型来执行 CRUD 操作。

Lambda 按照事件到达的顺序依次处理 Amazon DocumentDB 更改流中的事件。因此,您的函数一次只能处理一条来自 Amazon DocumentDB 的并发调用。要监控函数,您可以跟踪其并发指标

警告

Lambda 事件源映射至少处理每个事件一次,有可能出现重复处理记录的情况。为避免与重复事件相关的潜在问题,我们强烈建议您将函数代码设为幂等性。要了解更多信息,请参阅 AWS 知识中心的如何使我的 Lambda 函数具有幂等性

Amazon DocumentDB 事件示例

{ "eventSourceArn": "arn:aws:rds:us-east-1:123456789012:cluster:canaryclusterb2a659a2-qo5tcmqkcl03", "events": [ { "event": { "_id": { "_data": "0163eeb6e7000000090100000009000041e1" }, "clusterTime": { "$timestamp": { "t": 1676588775, "i": 9 } }, "documentKey": { "_id": { "$oid": "63eeb6e7d418cd98afb1c1d7" } }, "fullDocument": { "_id": { "$oid": "63eeb6e7d418cd98afb1c1d7" }, "anyField": "sampleValue" }, "ns": { "db": "test_database", "coll": "test_collection" }, "operationType": "insert" } } ], "eventSource": "aws:docdb" }

有关此示例中的事件及其形状的更多信息,请参阅 MongoDB 文档网站上的更改事件

先决条件和权限

将 Amazon DocumentDB 用作 Lambda 函数的事件源之前,请注意以下先决条件。您必须:

注意

尽管 Lambda 函数的最大超时限制通常为 15 分钟,但 Amazon MSK、自行管理的 Apache Kafka、Amazon DocumentDB、Amazon MQ for ActiveMQ 和 RabbitMQ 的事件源映射,仅支持最大超时限制为 14 分钟的函数。此约束可确保事件源映射可以正确处理函数错误和重试。

配置网络安全

要通过事件源映射向 Lambda 提供对 Amazon DocumentDB 的完全访问权限,集群必须使用公有端点(公有 IP 地址),或者您必须提供对您在其中创建了集群的 Amazon VPC 的访问权限。

将 Amazon DocumentDB 与 Lambda 配合使用时,我们建议您创建 AWS PrivateLinkVPC 端点,并向函数提供对 Amazon VPC 中资源的访问权限。

创建端点以提供对以下资源的访问权限:

  • Lambda:为 Lambda 服务主体创建端点。

  • AWS STS:为 AWS STS 创建端点,以便服务主体代您代入角色。

  • Secrets Manager:如果集群使用 Secrets Manager 来存储凭证,请为 Secrets Manager 创建端点。

也可以在 Amazon VPC 中的每个公有子网上配置 NAT 网关。有关更多信息,请参阅 为连接到 VPC 的 Lambda 函数启用互联网访问权限

为 Amazon DocumentDB 创建事件源映射时,Lambda 会检查为 Amazon VPC 配置的子网和安全组是否已经存在弹性网络接口(ENI)。如果 Lambda 发现现有 ENI,则会尝试重用这些 ENI。否则,Lambda 会创建新的 ENI 来连接到事件源并调用函数。

注意

Lambda 函数始终在 Lambda 服务拥有的 Amazon VPC 中运行。函数的 VPC 配置不会影响事件源映射。只有事件源的网络配置才能决定 Lambda 连接到事件源的方式。

为包含集群的 Amazon VPC 配置安全组。默认情况下,Amazon DocumentDB 使用以下端口:27017

  • 入站规则:允许与事件源关联安全组的默认集群端口的所有流量。

  • 出站规则:允许所有目标的端口 443 上的所有流量。允许与事件源关联安全组的默认集群的所有流量。

  • Amazon VPC 端点入站规则:如果您正在使用 Amazon VPC 端点,则与 Amazon VPC 端点关联的安全组,必须允许来自集群安全组的端口 443 上的入站流量。

如果集群使用身份验证,您还可以限制 Secrets Manager 端点的端点策略。要调用 Secrets Manager API,Lambda 会使用函数角色而非 Lambda 服务主体。

例 VPC 端点策略:Secrets Manager 端点
{ "Statement": [ { "Action": "secretsmanager:GetSecretValue", "Effect": "Allow", "Principal": { "AWS": [ "arn:aws::iam::123456789012:role/my-role" ] }, "Resource": "arn:aws::secretsmanager:us-west-2:123456789012:secret:my-secret" } ] }

当您使用 Amazon VPC 端点时,AWS 会使用端点的弹性网络接口(ENI)路由 API 调用来调用函数。Lambda 服务主体需要针对使用这些 ENI 的任何函数调用 lambda:InvokeFunction。默认情况下,Amazon VPC 端点具有开放的 IAM 策略,允许对资源进行广泛访问。要在生产环境中配合使用 Amazon DocumentDB 和 Lambda,您可以将这些策略限制为仅允许特定的主体访问特定的角色和函数。

例 端点策略:Lambda 端点
{ "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "arn:aws::lambda:us-west-2:123456789012:function:my-function" } ] }

此外,对于事件源映射,如果您希望与 Lambda 集成的资源在 AWS 账户中部署,则 Lambda 服务主体必须执行 sts:AssumeRole 才能代入使用弹性网络接口(ENI)的角色。

例 端点策略:AWS STS 端点
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "arn:aws::iam::123456789012:role/my-role" } ] }
警告

将端点策略限制为仅允许来自组织内部的 API 调用,这将阻止事件源映射正常运行。

创建 Amazon DocumentDB 事件源映射(控制台)

要配置 Lambda 函数以从 Amazon DocumentDB 集群更改流中读取数据,请创建一个 事件源映射。这一部分介绍了如何从 Lambda 控制台执行此操作。有关 AWS SDK 和 AWS CLI 说明,请参阅 创建 Amazon DocumentDB 事件源映射(SDK 或 CLI)

要创建 Amazon DocumentDB 事件源映射(控制台)
  1. 打开 Lamba 控制台的函数页面

  2. 选择一个函数的名称。

  3. Function overview(函数概览)下,选择 Add trigger(添加触发器)。

  4. 触发器配置下的下拉列表中,选择 DocumentDB

  5. 配置必填选项,然后选择 Add(添加)。

Lambda 支持 Amazon DocumentDB 事件源的以下选项:

  • DocumentDB 集群 – 选择一个 Amazon DocumentDB 集群。

  • 激活触发器 – 选择是否要立即激活触发器。如果选中此复选框,则在创建事件源映射后,您的函数会立即开始接收来自指定 Amazon DocumentDB 更改流的流量。我们建议取消选中此复选框,以便在禁用状态下创建事件源映射以进行测试。创建事件源映射后,您可以随时将其激活。

  • 数据库名称 – 输入集群中要使用的数据库的名称。

  • (可选)集合名称 – 输入数据库中要使用的集合的名称。如果您未指定集合,Lambda 将侦听来自数据库中每个集合的所有事件。

  • 批处理大小 – 设置要在单个批处理中检索的最大消息数,最高为 10000。默认批处理大小为 100。

  • 开始位置 – 选择将从流中开始读取记录的位置。

    • 最新 – 仅处理添加到流中的新记录。您的函数仅在 Lambda 完成创建事件源后才会开始处理记录。这意味着在成功创建事件源之前,某些记录可能会被丢弃。

    • 最早:处理流中的所有记录。Lambda 使用集群的日志保留期来确定从哪里开始读取事件。具体而言,Lambda 将从 current_time - log_retention_duration 开始读取事件。更改流必须在此时间戳之前已经处于活动状态,才能确保 Lambda 正确读取所有事件。

    • 在时间戳处:处理从特定时间开始的记录。更改流必须在指定的时间戳之前已经处于活动状态,才能确保 Lambda 正确读取所有事件。

  • 身份验证 – 选择访问集群中的代理时将使用的身份验证方法。

    • BASIC_AUTH – 使用基本身份验证时,您必须提供包含访问集群凭据所需凭证的 Secrets Manager 密钥。

  • Secrets Manager 密钥 – 选择包含访问 Amazon DocumentDB 集群所需身份验证详细信息(用户名和密码)的 Secrets Manager 密钥。

  • (可选)批处理时长 – 在调用函数之前收集记录的最长时间(以秒为单位,最大为 300)。

  • (可选)完整文档配置 – 对于文档更新操作,请选择要发送到流中的内容。默认值为 Default,这意味着对于每个更改流事件,Amazon DocumentDB 仅发送一个描述所发生更改的增量。有关此字段的更多信息,请参阅 MongoDB Javadocs API 文档中的 FullDocument

    • 默认 – Lambda 将仅发送描述所发生更改的部分文档。

    • UpdateLookup – Lambda 将发送一个描述所发生更改的增量以及完整文档的副本。

创建 Amazon DocumentDB 事件源映射(SDK 或 CLI)

要使用 AWS SDK 来管理 Amazon DocumentDB 事件源映射,您可以使用以下 API 操作:

要使用 AWS CLI 创建事件源映射,请使用 create-event-source-mapping 命令。以下示例使用此命令将名为 my-function 的函数映射到一个 Amazon DocumentDB 更改流。事件源由 Amazon 资源名称(ARN)指定,批处理大小为 500,从以 Unix 时间表示的时间戳开始。该命令还指定了 Lambda 用来连接到 Amazon DocumentDB 的 Secrets Manager 密钥。此外,它还包含用于指定要从中读取数据的数据库和集合的 document-db-event-source-config 参数。

aws lambda create-event-source-mapping --function-name my-function \ --event-source-arn arn:aws:rds:us-west-2:123456789012:cluster:privatecluster7de2-epzcyvu4pjoy --batch-size 500 \ --starting-position AT_TIMESTAMP \ --starting-position-timestamp 1541139109 \ --source-access-configurations '[{"Type":"BASIC_AUTH","URI":"arn:aws:secretsmanager:us-east-1:123456789012:secret:DocDBSecret-BAtjxi"}]' \ --document-db-event-source-config '{"DatabaseName":"test_database", "CollectionName": "test_collection"}' \

应看到类似如下内容的输出:

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 500, "DocumentDBEventSourceConfig": { "CollectionName": "test_collection", "DatabaseName": "test_database", "FullDocument": "Default" }, "MaximumBatchingWindowInSeconds": 0, "EventSourceArn": "arn:aws:rds:us-west-2:123456789012:cluster:privatecluster7de2-epzcyvu4pjoy", "FunctionArn": "arn:aws:lambda:us-west-2:123456789012:function:my-function", "LastModified": 1541348195.412, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action" }

创建后,您可以使用 update-event-source-mapping 命令更新 Amazon DocumentDB 事件源的设置。以下示例会将批处理大小更新为 1000,并将批处理时长更新为 10 秒。对于此命令,您需要有事件源映射的 UUID,您可以通过 list-event-source-mapping 命令或 Lambda 控制台检索该值。

aws lambda update-event-source-mapping --function-name my-function \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --batch-size 1000 \ --batch-window 10

您应看到如下输出:

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 500, "DocumentDBEventSourceConfig": { "CollectionName": "test_collection", "DatabaseName": "test_database", "FullDocument": "Default" }, "MaximumBatchingWindowInSeconds": 0, "EventSourceArn": "arn:aws:rds:us-west-2:123456789012:cluster:privatecluster7de2-epzcyvu4pjoy", "FunctionArn": "arn:aws:lambda:us-west-2:123456789012:function:my-function", "LastModified": 1541359182.919, "LastProcessingResult": "OK", "State": "Updating", "StateTransitionReason": "User action" }

Lambda 会异步更新设置,因此在更新完成之前,您可能无法在输出中看到这些更改。要查看事件源映射的当前设置,请使用 get-event-source-mapping 命令。

aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b

您应看到如下输出:

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "DocumentDBEventSourceConfig": { "CollectionName": "test_collection", "DatabaseName": "test_database", "FullDocument": "Default" }, "BatchSize": 1000, "MaximumBatchingWindowInSeconds": 10, "EventSourceArn": "arn:aws:rds:us-west-2:123456789012:cluster:privatecluster7de2-epzcyvu4pjoy", "FunctionArn": "arn:aws:lambda:us-west-2:123456789012:function:my-function", "LastModified": 1541359182.919, "LastProcessingResult": "OK", "State": "Enabled", "StateTransitionReason": "User action" }

要删除 Amazon DocumentDB 事件源映射,请使用 delete-event-source-mapping 命令:

aws lambda delete-event-source-mapping \ --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284

轮询和流的起始位置

请注意,事件源映射创建和更新期间的流轮询最终是一致的。

  • 在事件源映射创建期间,可能需要几分钟才能开始轮询来自流的事件。

  • 在事件源映射更新期间,可能需要几分钟才能停止和重新开始轮询来自流的事件。

此行为意味着,如果你指定 LATEST 作为流的起始位置,事件源映射可能会在创建或更新期间错过事件。为确保不会错过任何事件,请将流的起始位置指定为 TRIM_HORIZON 或 AT_TIMESTAMP

监控 Amazon DocumentDB 事件源

为便于您监控 Amazon DocumentDB 事件源,在函数处理完一批记录后,Lambda 将发送 IteratorAge 指标。迭代器期限是最近事件的时间戳和当前时间戳之间的差值。基本上,IteratorAge 指标表示自处理该批处理中最后一条记录后已经过的时间。如果函数当前正在处理新事件,则可使用迭代器期限来估算新记录的添加时间与函数处理该记录的时间之间的延迟。如果 IteratorAge 呈上升趋势,则可能说明您的函数存在问题。有关更多信息,请参阅 查看 Lambda 函数的指标

Amazon DocumentDB 更改流未经过优化,无法处理事件之间的较大时间间隔。如果 Amazon DocumentDB 事件源在很长一段时间内都没有收到任何事件,那么 Lambda 可能会禁用事件源映射。此时段的长度可能从几周到几个月不等,具体取决于集群规模和其他工作负载。

Lambda 支持最大 6MB 的负载。但是,Amazon DocumentDB 更改流事件的大小可达 16MB。如果更改流尝试向 Lambda 发送大于 6MB 的更改流事件,Lambda 会丢弃该消息并发送 OversizedRecordCount 指标。Lambda 将尽力发送所有指标。