将 DynamoDB 与 Amazon EventBridge 集成
Amazon DynamoDB 提供用于捕获变更数据的 DynamoDB Streams,可帮助捕获 DynamoDB 表中的项目级更改。DynamoDB Streams 可以调用 Lambda 函数来处理这些更改,从而支持与其他服务和应用程序进行事件驱动型集成。DynamoDB Streams 还支持筛选,可实现高效且有针对性的事件处理。
DynamoDB Streams 支持每个分片最多同时有两个使用者,并支持通过 Lambda 事件筛选功能进行筛选,以便仅处理符合特定条件的项目。有些客户可能要求支持两个以上使用者。其他人可能需要在处理更改事件之前扩充事件数据,或者使用更高级的筛选和路由功能。
将 DynamoDB 与 EventBridge 集成可以满足这些要求。
Amazon EventBridge 是一项无服务器服务,使用事件将应用程序组件连接在一起,可让您更轻松地构建可扩展的事件驱动型应用程序。EventBridge 通过 EventBridge Pipes 提供与 Amazon DynamoDB 的原生集成,可帮助将数据从 DynamoDB 轻松传输到 EventBridge 总线。然后,该总线可以通过一组规则和目标将事件传输到多个应用程序和服务。
工作方式
通过将 DynamoDB 与 EventBridge 管道集成,可以使用 DynamoDB Streams 在 DynamoDB 表中捕获按时间排序的项目级更改序列。以这种方式捕获的每条记录都包含表中修改的数据。
EventBridge 管道从 DynamoDB Streams 接收事件并将其路由到诸如 EventBridge 总线 [事件总线是接收事件并将其传送到目的地(也称为目标)的路由器] 之类的目标。交付取决于哪些规则与事件内容相匹配。该管道还能够筛选特定事件,以及在将事件数据发送到目标之前对其进行扩充。
虽然 EventBridge 支持多种目标类型,但在实施扇出设计时,常见的选择是使用 Lambda 函数作为目标。以下示例演示了如何与 Lambda 函数目标进行集成。
通过控制台创建集成
按照以下步骤,通过 AWS Management Console来创建集成。
-
按照《DynamoDB 开发人员指南》的启用流部分中的步骤,在源表上启用 DynamoDB Streams。如果源表上已启用 DynamoDB Streams,请验证当前使用者是否少于两个。使用者可能是 Lambda 函数、DynamoDB 全局表、与 Amazon OpenSearch Service 进行零 ETL 集成的 Amazon DynamoDB,或者是直接从流中读取数据(例如通过 DynamoDB Streams Kinesis 适配器)的应用程序。
-
按照《EventBridge 用户指南》中创建 Amazon EventBridge 事件总线部分中的步骤,创建 EventBridge 事件总线。
-
创建事件总线后,启用架构发现。
-
-
按照《EventBridge 用户指南》中创建 Amazon EventBridge 管道部分中的步骤,创建 EventBridge 管道。
-
配置源时,在源字段中选择 DynamoDB,然后在 DynamoDB Streams 字段中选择源表流的名称。
-
配置目标时,在目标服务字段中选择 EventBridge 事件总线,在事件总线作为目标字段中,选择在步骤 2 中创建的事件总线。
-
-
向源 DynamoDB 表中写入项目示例以触发事件。这将允许 EventBridge 从该项目示例推断出架构。此架构可用于创建路由事件的规则。例如,如果您要实施涉及重载属性的设计模式,可能需要根据排序键的值触发不同的规则。有关如何向 DynamoDB 写入项目的详细信息,请参阅《DynamoDB 开发人员指南》中的使用项目和属性部分。
-
按照《Lambda 开发人员指南》中使用 Python 构建 Lambda 函数部分中的步骤,创建一个用作目标的 Python Lambda 函数示例。在创建函数时,可以使用下面的代码示例来演示集成。调用时,它将打印与可以在 CloudWatch 日志中查看的事件一起接收的
NewImage
和OldImage
。import json def lambda_handler(event, context): dynamodb = event.get('detail', {}).get('dynamodb', {}) new_image = dynamodb.get('NewImage') old_image = dynamodb.get('OldImage') if new_image: print("NewImage:", json.dumps(new_image, indent=2)) if old_image: print("OldImage:", json.dumps(old_image, indent=2)) return {'statusCode': 200, 'body': json.dumps(event)}
-
按照《EventBridge 用户指南》中,介绍如何对事件做出发应的创建规则部分中的步骤,创建可将事件路由到新 Lambda 函数的 EventBridge 规则。
-
定义规则详细信息时,选择您在步骤 2 中创建的事件总线的名称作为事件总线。
-
构建事件模式时,请遵循现有架构指南。在这里,您可以为事件选择发现的架构注册表和已发现的架构。这将允许您配置特定于您的用例的事件模式,以便仅路由与特定属性匹配的消息。例如,如果您希望只匹配 SK 以
“user#”
开头的 DynamoDB 项,则可以使用如下配置。 -
根据架构完成模式设计后,单击以 JSON 格式生成事件模式。相反,如果要匹配 DynamoDB Streams 中显示的所有事件,请使用以下 JSON 生成事件模式。
{ "source": ["aws.dynamodb"] }
-
选择目标时,请按照 AWS 服务指南操作。在“选择目标”字段中,选择“Lambda 函数”。在“函数”字段中,选择您在步骤 5 中创建的 Lambda 函数。
-
-
现在,您可以按照《EventBridge 用户指南》中在事件总线中启动或停止架构发现部分中的步骤,停止在事件总线中发现架构。
-
在源 DynamoDB 表中写入第二个项目示例以触发事件。验证是否在每个步骤中成功处理了该事件。
-
按照《EventBridge 用户指南》中监控 Amazon EventBridge部分中的步骤,查看事件总线的 CloudWatch 指标 PutEventsApproximateSuccessCount。
-
按照《Lambda 开发人员指南》中监控 Lambda 函数和排查其故障部分中的步骤,查看 Lambda 函数的函数日志。如果 Lambda 函数使用提供的代码示例,您应该会在 CloudWatch Logs 日志组中看到来自 DynamoDB Streams 的打印的
NewImage
和OldImage
。 -
按照《Lambda 开发人员指南》中监控 Lambda 函数和排查其故障部分中的步骤,查看 Lambda 函数的错误计数和成功率(%)指标。
-
后续步骤
此示例提供了将单个 Lambda 函数作为目标的基本集成。要更好地了解更复杂的配置(例如,如何创建多个规则、创建多个目标、与其他服务集成以及扩充事件),请参阅完整的 EventBridge 用户指南:开始使用 EventBridge。
注意
请注意可能与您的应用程序相关的任何 EventBridge 配额。虽然 DynamoDB Streams 容量能够随您的表而扩展,但 EventBridge 配额是单独提供的。在大型应用程序中,需要注意的常见配额是每秒事务中的调用节流限制和每秒事务中的 PutEvents 节流限制。这些配额指定了可以发送到目标的调用次数以及每秒可以放入总线的事件数。