将 Amazon MQ 消息代理作为源 - Amazon EventBridge

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将 Amazon MQ 消息代理作为源

您可以使用 Pip EventBridge es 接收来自亚马逊 MQ 消息代理的记录。然后,您可以选择筛选或增强这些记录,然后再将它们发送到可用的目标之一进行处理。在设置管道时,您可以选择特定于 Amazon MQ 的设置。 EventBridge 在将数据发送到目标时,Pipes 会维护消息代理中记录的顺序。

Amazon MQ 是一项托管消息代理服务,用于 Apache ActiveMQRabbitMQ。消息代理允许软件应用程序和组件使用各种编程语言、操作系统和正式消息收发协议进行通信,使用主题或队列作为事件目标。

Amazon MQ 还可以通过安装 ActiveMQ 代理或 RabbitMQ 代理,代表您管理 Amazon Elastic Compute Cloud (Amazon EC2) 实例。安装代理后,它会为您的实例提供不同的网络拓扑和其他基础设施需求。

Amazon MQ 源有以下配置限制:

  • 跨账户 — EventBridge 不支持跨账户处理。您不能使用 EventBridge 来处理来自不同 AWS 账户中的 Amazon MQ 消息代理的记录。

  • 身份验证 — 对于 ActiveMQ,仅支持 ActiveMQ。 SimpleAuthenticationPlugin对于 RabbitMQ,仅支持 PLAIN 身份验证机制。要管理凭证,请使用 AWS Secrets Manager。有关 ActiveMQ 身份验证的更多信息,请参阅《Amazon MQ 开发人员指南》中的使用 LDAP 集成 ActiveMQ 代理

  • 连接配额 - 代理具有每个有线级协议允许的最大连接数。此配额基于代理实例类型。有关更多信息,请参阅《Amazon MQ 开发人员指南》中的 *Amazon MQ 中的配额*代理部分。

  • 连接 - 您可以在公有或私有虚拟私有云 (VPC) 中创建代理。对于私有 VPC,您的管道需要具备对 VPC 的访问权限才能接收消息。

  • 事件目标 - 仅支持队列目标。但是,您可以使用虚拟主题,在与管道交互时与,虚拟主题在内部与主题行为一致,在外部与队列行为一致。有关更多信息,请参阅 Apache ActiveMQ 网站上的虚拟目标和 RabbitMQ 网站上的虚拟主机

  • 网络拓扑 - 对于 ActiveMQ,管道仅支持一个单实例或备用代理。对于 RabbitMQ,每个管道只支持一个单实例代理或集群部署。单实例代理需要一个失效转移端点。有关这些代理部署模式的更多信息,请参阅《Amazon MQ 开发人员指南》中的 Active MQ 代理架构RabbitMQ 代理架构

  • 协议 - 支持的协议取决于您使用的 Amazon MQ 集成。

    • 对于 ActiveMQ 集成 EventBridge ,使用 /Java 消息服务 (JMS) 协议来使用 OpenWire消息。任何其他协议都不支持消息使用。 EventBridge 仅支持 JMS 协议中的TextMessageBytesMessage操作。有关该 OpenWire 协议的更多信息,请参阅 Apache ActiveMQ 网站OpenWire上的内容。

    • 对于 RabbitMQ 集成, EventBridge 使用 AMQP 0-9-1 协议来使用消息。消息的使用不支持任何其他协议。有关 RabbitMQ 的 AMQP 0-9-1 协议实施的详细信息,请参阅 RabbitMQ 网站上的 AMQP 0-9-1 完整参考指南

EventBridge 自动支持亚马逊 MQ 支持的最新版本的 ActiveMQ 和 RabbitMQ。有关受支持的最新版本,请参阅《Amazon MQ 开发人员指南》中的 Amazon MQ 发布说明

注意

默认情况下,Amazon MQ 代理有一个每周维护时段。代理在该时段内无法使用。对于没有待机状态的经纪人,在窗口结束之前 EventBridge 不会处理消息。

示例事件

以下示例事件显示了管道接收到的信息。您可以使用此事件来创建和筛选您的事件模式,或定义输入转换。并非所有字段都可以筛选。有关可筛选字段的更多信息,请参阅 亚马逊 EventBridge 管道过滤

ActiveMQ

[ { "eventSource": "aws:amq", "eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1---mq---us-west-2.amazonaws.com.rproxy.goskope.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/text-message", "data": "QUJDOkFBQUE=", "connectionId": "myJMSCoID", "redelivered": false, "destination": { "physicalname": "testQueue" }, "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959 }, { "eventSource": "aws:amq", "eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1---mq---us-west-2.amazonaws.com.rproxy.goskope.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/bytes-message", "data": "3DTOOW7crj51prgVLQaGQ82S48k=", "connectionId": "myJMSCoID1", "persistent": false, "destination": { "physicalname": "testQueue" }, "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959 } ]

RabbitMQ

[ { "eventSource": "aws:rmq", "eventSourceArn": "arn:aws:mq:us-west-2:111122223333:broker:pizzaBroker:b-9bcfa592-423a-4942-879d-eb284b418fc8", "eventSourceKey": "pizzaQueue::/", "basicProperties": { "contentType": "text/plain", "contentEncoding": null, "headers": { "header1": { "bytes": [ 118, 97, 108, 117, 101, 49 ] }, "header2": { "bytes": [ 118, 97, 108, 117, 101, 50 ] }, "numberInHeader": 10 }, "deliveryMode": 1, "priority": 34, "correlationId": null, "replyTo": null, "expiration": "60000", "messageId": null, "timestamp": "Jan 1, 1970, 12:33:41 AM", "type": null, "userId": "AIDACKCEVSQ6C2EXAMPLE", "appId": null, "clusterId": null, "bodySize": 80 }, "redelivered": false, "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ==" } ]

使用者组

要与 Amazon MQ 互动,请 EventBridge 创建一个可以从您的亚马逊 MQ 经纪人那里读取信息的消费者群组。使用与管道 UUID 相同的 ID 创建使用者组。

对于 Amazon MQ 来源,对记录进行 EventBridge 批处理,然后通过单个有效负载将其发送到您的函数。要控制行为,您可以配置批处理时段和批处理大小。 EventBridge 提取消息,直到出现以下情况之一:

  • 处理后的记录达到 6 MB 的有效负载大小上限。

  • 批处理时间窗过期。

  • 记录数达到完整批次大小。

EventBridge 将您的批处理转换为单个有效负载,然后调用您的函数。消息既不会永久保存,也不会反序列化。相反,使用者组会将其作为字节 BLOB 进行检索。然后以 base64 格式编码为 JSON 有效负载。如果管道对批处理中的任何消息返回错误,则 EventBridge 会重试整批消息,直到处理成功或消息过期。

网络配置

默认情况下,在 PubliclyAccessible 标志设置为 false 时创建 Amazon MQ 代理。只有在 PubliclyAccessible 设置为 true 时,代理才会接收公有 IP 地址。要获得对管道的完全访问权限,您的代理必须使用公有端点,或提供对 VPC 的访问权限。

如果您的亚马逊 MQ 代理不可公开访问,则 EventBridge 必须有权访问与您的代理关联的亚马逊虚拟私有云 (Amazon VPC) 资源。

  • 要访问您的 Amazon MQ 代理的 VPC, EventBridge 可以使用您的来源子网的出站互联网访问权限。对于公有子网,它必须是托管 NAT 网关。对于私有子网,它可以是 NAT 网关,也可以是您自己的 NAT。确保此 NAT 具有公共 IP 地址,可以连接到互联网。

  • EventBridge Pipes 还支持通过传送事件 AWS PrivateLink,允许您在不通过公共互联网的情况下将事件从位于 Amazon Virtual Private Cloud (Amazon VPC) 的事件源发送到 Pipes 目标。您可以使用 Pipes 从 Amazon Managed Streaming for Apache Kafka (Amazon MSK)、自我管理的 Apache Kafka 以及位于私有子网中的 Amazon MQ 源进行轮询,而无需部署互联网网关、配置防火墙规则或设置代理服务器。

    要设置 VPC 终端节点,请参阅AWS PrivateLink 用户指南中的创建 VPC 终端节点。对于服务名称,请选择com.amazonaws.region.pipes-data

使用以下规则配置您的 Amazon VPC 安全组(至少):

  • 入站规则-允许 Amazon MQ 代理端口上为您的源指定的安全组的所有流量。

  • 出站规则 – 允许所有目标的端口 443 上的所有流量传输。允许 Amazon MQ 代理端口上为您的源指定的安全组的所有流量。

    代理端口包括:

    • 9092 表示纯文本

    • 9094 适用于 TLS

    • 9096 for SASL

    • 9098 for IAM

注意

可通过 Amazon MQ API 发现您的 Amazon VPC 配置。在设置过程中,您不需要对其进行配置。