结合 Amazon MQ 使用 Lambda - AWS Lambda

结合 Amazon MQ 使用 Lambda

注意

如果想要将数据发送到 Lambda 函数以外的目标,或要在发送数据之前丰富数据,请参阅 Amazon EventBridge Pipes(Amazon EventBridge 管道)。

Amazon MQ 是一项托管消息代理服务,用于 Apache ActiveMQRabbitMQ消息代理允许软件应用程序和组件使用各种编程语言、操作系统和正式消息收发协议,通过主题或队列事件目标进行通信。

Amazon MQ 还可以通过安装 ActiveMQ 代理或 RabbitMQ 代理以及提供不同的网络拓扑和其他基础设施需求来代表您管理 Amazon Elastic Compute Cloud (Amazon EC2) 实例。

您可使用 Lambda 函数处理来自 Amazon MQ 消息代理的记录。Lambda 通过事件源映射调用您的函数,事件源映射是从您的代理读取消息并同步调用函数的一种 Lambda 资源。

警告

Lambda 事件源映射至少处理每个事件一次,有可能出现重复处理记录的情况。为避免与重复事件相关的潜在问题,我们强烈建议您将函数代码设为幂等性。要了解更多信息,请参阅 AWS 知识中心的如何使我的 Lambda 函数具有幂等性

Amazon MQ 事件源映射有以下配置限制:

  • 并发 – 使用 Amazon MQ 事件源映射的 Lambda 函数具有默认的最大并发设置。对于 ActiveMQ,Lambda 服务将每个 Amazon MQ 事件源映射的并发执行环境数量限制为 5 个。对于 RabbitMQ,每个 Amazon MQ 事件源映射的并发执行环境数量限制为 1 个。即使您更改了函数的预留或预调配并发设置,Lambda 服务也不会提供更多的执行环境。要请求增加单个 Amazon MQ 事件源映射的默认最大并发数,请联系 AWS Support 并提供事件源映射 UUID 和区域。因为是在特定的事件源映射级别而不是账户或区域级别增加,所以需要为每个事件源映射手动请求按比例增加。

  • 跨账户 – Lambda 不支持跨账户处理。您不能使用 Lambda 处理来自不同 AWS 账户 账户中的 Amazon MQ 消息代理的记录。

  • 身份验证 – 对于 ActiveMQ,仅支持 ActiveMQ SimpleAuthenticationPlugin。对于 RabbitMQ,仅支持 PLAIN 身份验证机制。用户必须使用 AWS Secrets Manager 来管理凭据。有关 ActiveMQ 身份验证的更多信息,请参阅 Amazon MQ 开发人员指南中的使用 LDAP 集成 ActiveMQ 代理

  • 连接配额 – 代理具有每个有线级协议允许的最大连接数。此配额基于代理实例类型。有关更多信息,请参阅 Amazon MQ 开发人员指南中的 Amazon MQ 中的配额代理部分。

  • 连接 – 您可以在公有或私有虚拟私有云(VPC)中创建代理。对于私有 VPC,您的 Lambda 函数需要具备对 VPC 的访问权限才能接收消息。有关更多信息,请参阅此部分后面的配置网络安全

  • 事件目标 – 仅支持队列目标。但是,您可以使用虚拟主题,虚拟主题在内部与主题行为一致,在与 Lambda 交互时与队列行为一致。有关更多信息,请参阅 Apache ActiveMQ 网站上的虚拟目标和 RabbitMQ 网站上的虚拟主机

  • 网络拓扑 – 对于 ActiveMQ,每个事件源映射仅支持一个单实例或备用代理。对于 RabbitMQ,每个事件源映射只支持一个单实例代理或集群部署。单实例代理需要一个失效转移端点。有关这些代理部署模式的更多信息,请参阅 Amazon MQ 开发人员指南中的 Active MQ 代理架构RabbitMQ 代理架构

  • 协议 – 支持的协议取决于 Amazon MQ 集成的类型。

    • 对于 ActiveMQ 集成,Lambda 使用 OpenWire/Java Message Service (JMS) 协议来使用消息。消息的使用不支持任何其他协议。在 JMS 协议中,仅支持 TextMessageBytesMessage。Lambda 还支持 JMS 自定义属性。有关 OpenWire 协议的更多信息,请参阅 Apache ActiveMQ 网站上的 OpenWire

    • 对于 RabbitMQ 集成,Lambda 使用 AMQP 0-9-1 协议来使用消息。消息的使用不支持任何其他协议。有关 RabbitMQ 的 AMQP 0-9-1 协议实施的详细信息,请参阅 RabbitMQ 网站上的 AMQP 0-9-1 完整参考指南

Lambda 自动支持 Amazon MQ 支持的最新版本的 ActiveMQ 和 RabbitMQ。有关受支持的最新版本,请参阅 Amazon MQ 开发人员指南中的 Amazon MQ 发布说明

注意

默认情况下,Amazon MQ 代理有一个每周维护时段。代理在该时段内无法使用。对于没有备用代理的代理,Lambda 将无法在该时段处理任何消息。

了解 Amazon MQ 的 Lambda 使用者组

为了与 Amazon MQ 进行交互,Lambda 会创建一个可以从 Amazon MQ 代理中读取的使用者组。使用与事件源映射 UUID 相同的 ID 创建使用者组。

对于 Amazon MQ 事件源,Lambda 会将记录合并为批处理,然后通过单个有效负载中将其发送到您的函数。要控制行为,您可以配置批处理时段和批处理大小。Lambda 会持续提取消息,直到达到 6 MB 的最大有效负载大小、批处理时段过期或记录数达到完整批处理大小时为止。有关更多信息,请参阅 批处理行为

使用者组将消息作为字节 BLOB 进行检索,然后以 base64 格式编码为单个 JSON 有效负载,接下来调用您的函数。如果函数为批处理中的任何消息返回错误,Lambda 将重试整批消息,直到处理成功或消息过期为止。

注意

尽管 Lambda 函数的最大超时限制通常为 15 分钟,但 Amazon MSK、自行管理的 Apache Kafka、Amazon DocumentDB、Amazon MQ for ActiveMQ 和 RabbitMQ 的事件源映射,仅支持最大超时限制为 14 分钟的函数。此约束可确保事件源映射可以正确处理函数错误和重试。

您可以使用 Amazon CloudWatch 中的 ConcurrentExecutions 指标监控给定函数的并发使用情况。有关并发的更多信息,请参阅 为函数配置预留并发

例 Amazon MQ 记录事件
ActiveMQ
{ "eventSource": "aws:mq", "eventSourceArn": "arn:aws:mq:us-east-2:111122223333:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", "messages": [ { "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1---mq---us-east-2.amazonaws.com.rproxy.goskope.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/text-message", "deliveryMode": 1, "replyTo": null, "type": null, "expiration": "60000", "priority": 1, "correlationId": "myJMSCoID", "redelivered": false, "destination": { "physicalName": "testQueue" }, "data":"QUJDOkFBQUE=", "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959, "properties": { "index": "1", "doAlarm": "false", "myCustomProperty": "value" } }, { "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1---mq---us-east-2.amazonaws.com.rproxy.goskope.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/bytes-message", "deliveryMode": 1, "replyTo": null, "type": null, "expiration": "60000", "priority": 2, "correlationId": "myJMSCoID1", "redelivered": false, "destination": { "physicalName": "testQueue" }, "data":"LQaGQ82S48k=", "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959, "properties": { "index": "1", "doAlarm": "false", "myCustomProperty": "value" } } ] }
RabbitMQ
{ "eventSource": "aws:rmq", "eventSourceArn": "arn:aws:mq:us-east-2:111122223333:broker:pizzaBroker:b-9bcfa592-423a-4942-879d-eb284b418fc8", "rmqMessagesByQueue": { "pizzaQueue::/": [ { "basicProperties": { "contentType": "text/plain", "contentEncoding": null, "headers": { "header1": { "bytes": [ 118, 97, 108, 117, 101, 49 ] }, "header2": { "bytes": [ 118, 97, 108, 117, 101, 50 ] }, "numberInHeader": 10 }, "deliveryMode": 1, "priority": 34, "correlationId": null, "replyTo": null, "expiration": "60000", "messageId": null, "timestamp": "Jan 1, 1970, 12:33:41 AM", "type": null, "userId": "AIDACKCEVSQ6C2EXAMPLE", "appId": null, "clusterId": null, "bodySize": 80 }, "redelivered": false, "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ==" } ] } }
注意

在 RabbitMQ 示例中,pizzaQueue 是 RabbitMQ 队列的名称,/ 是虚拟主机的名称。接收消息时,事件源会将消息列在 pizzaQueue::/ 下。