Amazon MQ for RabbitMQ 最佳实践 - Amazon MQ

Amazon MQ for RabbitMQ 最佳实践

以此作为参考快速找到在 Amazon MQ 上使用 RabbitMQ 代理最大程度提高性能和降低吞吐量成本的建议。

重要

目前,Amazon MQ 不支持或在 JSON 中使用结构化日志记录(在 RabbitMQ 3.9.x 中推出)。

重要

Amazon MQ for RabbitMQ 不支持用户名“guest”,并会在您创建新代理时删除默认的访客账户。Amazon MQ 还将定期删除任何由客户创建的名为“guest”的账户。

启用自动次要版本升级

使用最新的代理版本进行安全和错误修复,并改进性能。您可以启用 Amazon MQ 的自动次要版本升级,以管理升级到最新补丁版本。

使用已弃用的功能

如果您使用的是 RabbitMQ on Amazon MQ 3.13 版本,您将在 RabbitMQ 管理用户界面中看到一个横幅,其中显示:Deprecated features are being used.

Navigation bar with Overview tab selected, showing Totals section header.

这是因为 RabbitMQ on Amazon MQ 使用了以下 RabbitMQ 不再提供的功能,或为 RabbitMQ on Amazon MQ 自动配置的功能:

  • 经典队列镜像

  • 全局 QoS

  • 临时非独占队列

这是 3.13 版的信息说明横幅,无需任何操作。您的 Amazon MQ 代理将继续使用这些功能。

选择正确的代理实例类型以实现最佳吞吐量

代理实例类型的消息吞吐量取决于应用程序的使用案例。较小的代理实例类型(如 t3.micro)只应用于测试应用程序性能。在生产环境中使用较大的实例之前使用这些微型实例可以提高应用程序性能并有助于您降低开发成本。在实例类型 m5.large 及更大实例上,您可以使用集群部署来实现高可用性和消息持久性。较大的代理实例类型可以处理生产级别的客户端和队列、高吞吐量、内存中的消息和冗余消息。有关选择正确实例类型的更多信息,请参阅《大小调整指南》。

使用多个通道

为避免连接中断,请在单个连接上使用多个通道。应用程序应避免 1:1 的连接与通道比率。我们建议每个进程使用一个连接,然后每个线程使用一个通道。避免过度使用通道,以防通道泄漏。

启用延迟队列

如果处理大量消息的队列很长,启用延迟队列可以提高代理性能。

RabbitMQ 的默认行为是在内存中缓存消息,并且仅在代理需要更多可用内存时将其移动到磁盘。将消息从内存移动到磁盘需要时间,并且会停止消息处理。延迟队列通过尽快将消息存储到磁盘,大大加快了内存到磁盘的进程,从而减少了缓存在内存中的消息数量。

您可以通过在声明时设置 queue.declare 参数或通过 RabbitMQ 管理控制台配置策略来启用延迟队列。以下示例演示了如何使用 RabbitMQ Java 客户端库声明延迟队列。

Map<String, Object> args = new HashMap<String, Object>(); args.put("x-queue-mode", "lazy"); channel.queueDeclare("myqueue", false, false, false, args);

默认情况下,3.12.13 及更高版本上的所有 Amazon MQ for RabbitMQ 队列都表现为延迟队列。要升级到最新版本的 Amazon MQ for RabbitMQ,请参阅 升级 Amazon MQ 代理引擎版本

注意

启用延迟队列会增加磁盘输入/输出操作。

使用持久消息和持续队列

持久消息有助于防止在代理崩溃或重启的情况下丢失数据。持久消息一到达就会立即写入磁盘。但是,与延迟队列不同的是,持久消息同时在内存和磁盘中缓存,除非代理需要更多内存。在需要更多内存的情况下,通过管理将消息存储到磁盘的 RabbitMQ 代理机制从内存中删除消息,通常称为持久性层

要启用消息持久性,可以将队列声明为 durable 并将消息传递模式设置为 persistent。以下示例演示了如何使用 RabbitMQ Java 客户端库声明持续队列。在使用 AMQP 0-9-1 时,您可以通过设置“2”传送模式将消息标记为持久消息。

boolean durable = true; channel.queueDeclare("my_queue", durable, false, false, null);

将队列配置为持续队列后,您可以通过将 MessageProperties 设置为 PERSISTENT_TEXT_PLAIN 来将持久消息发送到您的队列,如以下示例所示。

import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "my_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

保持队列简短

在集群部署中,包含大量消息的队列可能会导致资源过度利用。当代理被过度利用时,重启 Amazon MQ for RabbitMQ 代理可能会导致性能进一步降低。如果重启,过度利用的代理可能会在 REBOOT_IN_PROGRESS 状态下变得反应迟钝。

维护时段,Amazon MQ 每次执行一个节点的所有维护工作,以确保代理保持正常运行。因此,在每个节点恢复正常运行时,队列可能需要同步。在同步过程中,需要复制到镜像的消息将从相应的 Amazon Elastic Block Store(Amazon EBS)卷加载到内存中,以进行批处理。批处理消息可以让队列更快地同步。

如果队列保持简短且消息较少,则队列会按预期成功同步并恢复正常运行。但是,如果批处理中的数据量接近节点的内存限制,节点会引发高内存警报,暂停队列同步。您可以通过比较 CloudWatch 中的 RabbitMemUsedRabbitMqMemLimit 代理节点指标来确认内存使用情况。在消耗或删除消息或批处理中的消息数量减少之前,同步无法完成。

如果集群部署暂停队列同步,我们建议使用或删除消息,以减少队列中的消息数量。一旦队列深度减少且队列同步完成,代理状态将更改为 RUNNING。要解决暂停的队列同步,您还可以应用策略来减少队列同步批处理大小

您还可以定义自动删除和 TTL 策略,以主动减少资源使用,并将来自使用者的 NACK 降到最低。在代理上重新排列消息需要耗费大量 CPU,因此大量 NACK 会影响代理的性能。

配置发布者确认和使用者交付确认

确认消息已发送到代理的过程称为发布者确认。发布者确认告知您的应用程序何时可靠地存储了消息。发布者确认还有助于控制存储到代理的消息速率。如果没有发布者确认,就无法确认消息是否已成功处理,您的代理可能会丢弃无法处理的消息。

同样,当客户端应用程序向代理发回消息的交付和使用确认时,称为使用者交付确认。在使用 RabbitMQ 代理时,这两种确认对于确保数据安全至关重要。

使用者传递确认通常在客户端应用程序上配置。使用 AMQP 0-9-1 时,可以通过配置 basic.consume 方法来启用确认。AMQP 0-9-1 客户端也可以通过发送 confirm.select 方法来配置发布者确认。

通常,在通道中启用传递确认。例如,使用 RabbitMQ Java 客户端库时,可以使用 Channel#basicAck 来设置一个简单的 basic.ack 肯定确认,如以下示例所示。

// this example assumes an existing channel instance boolean autoAck = false; channel.basicConsume(queueName, autoAck, "a-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); // positively acknowledge a single delivery, the message will // be discarded channel.basicAck(deliveryTag, false); } });
注意

未确认的消息必须在内存中缓存。您可以通过为客户端应用程序配置预提取设置,限制使用者预提取的消息数量。

您可以配置 consumer_timeout,以便在使用者不确认交付时进行检测。如果使用者没有在超时值内发送确认,通道将被关闭,您将收到 PRECONDITION_FAILED。要诊断错误,请使用 UpdateConfiguration API 来增加 consumer_timeout 值。

配置预提取

您可以使用 RabbitMQ 预提取值来优化使用者使用消息的方式。RabbitMQ 通过将预提取计数应用于使用者而不是通道,实现 AMQP 0-9-1 提供的通道预提取机制。预提取值用于指定在任何给定时间向使用者发送的消息数量。默认情况下,RabbitMQ 会为客户端应用程序设置无限制的缓冲区大小。

在为您的 RabbitMQ 使用者设置预提取计数时,需要考虑各种因素。首先,考虑使用者的环境和配置。由于使用者需要在处理消息时将所有消息保存在内存中,因此,较高的预提取值可能会对使用者的性能产生负面影响,在某些情况下,可能会导致使用者同时崩溃。同样,RabbitMQ 代理本身会将其发送的所有消息缓存在内存中,直到收到使用者确认。如果没有为使用者配置自动确认,并且使用者需要相对较长的时间来处理消息,则较高的预提取值可能会导致 RabbitMQ 服务器内存不足。

考虑到上述因素,我们建议始终设置预提取值,以防止由于大量未处理或未确认的消息而导致 RabbitMQ 代理或其使用者出现内存不足的情况。如果您需要优化代理来处理大量消息,您可以使用一系列预提取计数来测试您的代理和使用者,以确定与使用者处理消息所需的时间相比,网络开销在哪个点上变得微不足道。

注意
  • 如果您的客户端应用程序已配置为自动确认将消息传递给使用者,则设置预提取值将不起作用。

  • 所有预提取消息都会从队列中删除。

以下示例演示了如何使用 RabbitMQ Java 客户端库为单一使用者设置 10 的预提取值。

ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.basicQos(10, false); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("my_queue", false, consumer);
注意

在 RabbitMQ Java 客户端库中,global 标志的默认值设置为 false,所以上面的例子可以简单地写成 channel.basicQos(10)

配置 Celery

Python Celery 会发送许多不必要的消息,这些消息会使查找和处理有用的信息变得更加困难。为了降低噪音并使处理更容易,请输入以下命令:

celery -A app_name worker --without-heartbeat --without-gossip --without-mingle

自动从网络故障中恢复

我们建议始终启用自动网络恢复,以防止在客户端连接到 RabbitMQ 节点失败的情况下出现严重停机。自版本 4.0.0 起,RabbitMQ Java 客户端库默认支持自动网络恢复。

如果在连接的输入/输出循环中引发未处理的异常、检测到套接字读取操作超时,或者如果服务器失去检测信号,则会触发自动连接恢复。

如果客户端和 RabbitMQ 节点之间的初始连接失败,将不会触发自动恢复。我们建议您编写应用程序代码,以便通过重试连接来解决初始连接失败的问题。以下示例演示了如何使用 RabbitMQ Java 客户端库来重试初始网络故障。

ConnectionFactory factory = new ConnectionFactory(); // enable automatic recovery if using RabbitMQ Java client library prior to version 4.0.0. factory.setAutomaticRecoveryEnabled(true); // configure various connection settings try { Connection conn = factory.newConnection(); } catch (java.net.ConnectException e) { Thread.sleep(5000); // apply retry logic }
注意

如果应用程序使用 Connection.Close 方法关闭连接,则不会启用或触发自动网络恢复。

为您的 RabbitMQ 代理启用 Classic Queue v2

我们建议在 3.10 和 3.11 版代理引擎上启用 Classic Queue v2(CQv2),以改进性能,包括:

  • 减少内存使用量

  • 改善使用者传递

  • 提高工作负载的吞吐量,让使用者跟上生产者的步伐

3.12.13 及更高版本上的所有 Amazon MQ for RabbitMQ 队列默认使用 CQv2。要升级到最新版本的 Amazon MQ for RabbitMQ,请参阅 升级 Amazon MQ 代理引擎版本

从 CQv1 迁移到 CQv2

要使用 CQv2,必须先启用 classic_mirrored_queue_version 功能标志。有关功能标志的更多信息,请参阅如何启用功能标志

要从 CQv1 迁移到 CQv2,必须创建新队列策略或编辑现有队列策略,并将 queue-version 策略密钥定义设置为 2。有关应用策略的更多信息,请参阅将策略应用于 Amazon MQ for RabbitMQ。有关使用队列策略启用 CQv2 的更多信息,请参阅 RabbitMQ 文档中的 Classic Queues

我们建议在开始迁移之前遵循其他最佳性能实践

如果您使用的是队列策略,则删除队列策略会将 CQv2 队列降级回 CQv1。我们不建议将 CQv2 队列降级为 CQv1,因为 RabbitMQ 会转换队列的磁盘表示形式。对于较深的队列来说,这可能会占用大量内存,而且非常耗时。