将 AWS Lambda 与 Amazon DynamoDB 结合使用 - AWS Lambda

将 AWS Lambda 与 Amazon DynamoDB 结合使用

注意

如果想要将数据发送到 Lambda 函数以外的目标,或要在发送数据之前丰富数据,请参阅 Amazon EventBridge Pipes(Amazon EventBridge 管道)。

您可以使用 AWS Lambda 函数来处理 Amazon DynamoDB Streams中的记录。使用 DynamoDB Streams,每次更新 DynamoDB 表时,您都可以触发 Lambda 函数以执行其他工作。

轮询和批处理流

Lambda 以每秒 4 次的基本频率轮询 DynamoDB 流中的分区来获取记录。如果记录可用,Lambda 会调用函数并等待结果。如果处理成功,Lambda 会恢复轮询,直到其收到更多记录。

默认情况下,Lambda 会在记录可用时尽快调用您的函数。如果 Lambda 从事件源中读取的批处理只有一条记录,则 Lambda 将会只向该函数发送一条记录。为避免在记录数量较少的情况下调用该函数,您可以配置 batching window(批处理时段),让事件源缓冲最多五分钟的记录。调用函数前,Lambda 会持续从事件源中读取记录,直到收集完整批处理、批处理时段到期或批处理达到 6MB 的有效负载时为止。有关更多信息,请参阅 批处理行为

警告

Lambda 事件源映射至少处理每个事件一次,有可能出现重复处理记录的情况。为避免与重复事件相关的潜在问题,我们强烈建议您将函数代码设为幂等性。要了解更多信息,请参阅 AWS 知识中心的如何使我的 Lambda 函数具有幂等性

Lambda 在发送下次批处理之前不会等待任何配置的扩展完成。换句话说,扩展可能会在 Lambda 处理下一批记录时继续运行。如果您违反了账户的任何并发设置或限制,可能会导致节流问题。要检测这是否是潜在问题,请监控函数并检查所显示的并发指标是否高于事件源映射的预期。由于调用间隔时间较短,Lambda 可能会短暂报告高于分片数量的并发使用量。即使对于没有扩展名的 Lambda 函数也是如此。

配置 ParallelizationFactor 设置以同时使用多个 Lambda 调用处理 DynamoDB 流的一个分片。您可以指定 Lambda 通过从 1(默认值)到 10 的并行化因子从分区中轮询的并发批次数。例如,假设您将 ParallelizationFactor 设置为 2,则最多可以有 200 个并发 Lambda 调用来处理 100 个 DynamoDB 流分片(但您可能实际上会看到不同的 ConcurrentExecutions 指标值)。这有助于在数据量不稳定并且 IteratorAge 较高时纵向扩展处理吞吐量。增加每个分片的并发批次数后,Lambda 仍然可以确保项目(分区和排序键)级别的顺序处理。

轮询和流的起始位置

请注意,事件源映射创建和更新期间的流轮询最终是一致的。

  • 在事件源映射创建期间,可能需要几分钟才能开始轮询来自流的事件。

  • 在事件源映射更新期间,可能需要几分钟才能停止和重新开始轮询来自流的事件。

此行为意味着,如果你指定 LATEST 作为流的起始位置,事件源映射可能会在创建或更新期间错过事件。为确保不会错过任何事件,请将流的起始位置指定为 TRIM_HORIZON

DynamoDB Streams 中的分片同时读取器

对于作为非全局表的单区域表,您可以设计最多两个 Lambda 函数来同时从同一个 DynamoDB Streams 分片读取数据。超过此限制会导致请求被拒。对于全局表,我们建议您将并行函数的数量限制为一个,以避免请求节流。

示例事件

{ "Records": [ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525", "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525", "eventSource": "aws:dynamodb" } ]}