Amazon MQ for RabbitMQ best practices - Amazon MQ

Amazon MQ for RabbitMQ best practices

Use this as a reference to quickly find recommendations for maximizing performance and minimizing throughput costs when working with RabbitMQ brokers on Amazon MQ.

Important

Currently, Amazon MQ does not support streams, or using structured logging in JSON, introduced in RabbitMQ 3.9.x.

Important

Amazon MQ for RabbitMQ does not support the username "guest", and will delete the default guest account when you create a new broker. Amazon MQ will also periodically delete any customer created account called "guest".

Turn on automatic minor version upgrades

Using the latest broker version security and bug fixes, and performance improvements. You can turn on automatic minor version upgrades for Amazon MQ to manage upgrades to the latest patch version.

Using deprecated features

If you are using version 3.13 for RabbitMQ on Amazon MQ, you will see a banner in the RabbitMQ Management UI that states: Deprecated features are being used.

Navigation bar with Overview tab selected, showing Totals section header.

This is because RabbitMQ on Amazon MQ uses the following features no longer offered on RabbitMQ, or are automatically configured for RabbitMQ on Amazon MQ:

  • Classic Queue Mirroring

  • Global QoS

  • Transient Non-Exclusive Queues

This is an informational banner for version 3.13 that requires no action. Your Amazon MQ broker will continue to use these features.

Choose the correct broker instance type for the best throughput

The message throughput of a broker instance type depends on your application use case. Smaller broker instance types like t3.micro should only be used for testing application performance. Using these micro instances before using larger instances in production can improve application performance and help you keep development costs down. On instance types m5.large and above, you can use cluster deployments for high availability and message durability. Larger broker instance types can handle production levels of clients and queues, high throughput, messages in memory, and redundant messages. For more info on choosing the correct instance type, see Amazon MQ for RabbitMQ sizing guidelines.

Use multiple channels

To avoid connection churn, use multiple channels over a single connection. Applications should avoid a 1:1 connection to channel ratio. We recommend using one connection per process, and then one channel per thread. Avoid excessive channel usage to prevent channel leaks.

Enable lazy queues

If you are working with very long queues that process large volumes of messages, enabling lazy queues can improve broker performance.

RabbitMQ's default behavior is to cache messages in memory and to move them to disk only when the broker needs more available memory. Moving messages from memory to disk takes time and halts message processing. Lazy queues significantly speeds up the memory to disk process by storing messages to disk as soon as possible, resulting in fewer messages cached in memory.

You can enable lazy queues by setting the queue.declare arguments at the time of declaration, or by configuring a policy via the RabbitMQ management console. The following example demonstrates declaring a lazy queue using the RabbitMQ Java client library.

Map<String, Object> args = new HashMap<String, Object>(); args.put("x-queue-mode", "lazy"); channel.queueDeclare("myqueue", false, false, false, args);

All Amazon MQ for RabbitMQ queues on 3.12.13 and above behave as lazy queues by default. To upgrade to the newest version of Amazon MQ for RabbitMQ, see Upgrading an Amazon MQ broker engine version.

Note

Enabling lazy queues can increase disk I/O operations.

Use persistent messages and durable queues

Persistent messages can help prevent data loss in situations where a broker crashes or restarts. Persistent messages are written to disk as soon as they arrive. Unlike lazy queues, however, persistent messages are cached both in memory and in disk unless more memory is needed by the broker. In cases where more memory is needed, messages are removed from memory by the RabbitMQ broker mechanism that manages storing messages to disk, commonly referred to as the persistence layer.

To enable message persistence, you can declare your queues as durable and set message delivery mode to persistent. The following example demonstrates using the RabbitMQ Java client library to declare a durable queue. When working with AMQP 0-9-1, you can mark messages as persistent by setting delivery mode "2".

boolean durable = true; channel.queueDeclare("my_queue", durable, false, false, null);

Once you have configured your queue as durable, you can send a persistent message to your queue by setting MessageProperties to PERSISTENT_TEXT_PLAIN as shown in the following example.

import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "my_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

Keep queues short

In cluster deployments, queues with a large number of messages can lead to resource overutilization. When a broker is overutilized, rebooting an Amazon MQ for RabbitMQ broker can cause further degradation of performance. If rebooted, overutilized brokers might become unresponsive in the REBOOT_IN_PROGRESS state.

During maintenance windows, Amazon MQ performs all maintenance work one node at a time to ensure that the broker remains operational. As a result, queues might need to synchronize as each node resumes operation. During synchronization, messages that need to be replicated to mirrors are loaded into memory from the corresponding Amazon Elastic Block Store (Amazon EBS) volume to be processed in batches. Processing messages in batches lets queues synchronize faster.

If queues are kept short and messages are small, the queues successfully synchronize and resume operation as expected. However, if the amount of data in a batch approaches the node's memory limit, the node raises a high memory alarm, pausing the queue sync. You can confirm memory usage by comparing the RabbitMemUsed and RabbitMqMemLimit broker node metrics in CloudWatch. Synchronization can't complete until messages are consumed or deleted, or the number of messages in the batch is reduced.

If queue synchronization is paused for a cluster deployment, we recommend consuming or deleting messages to lower the number of messages in queues. Once queue depth is reduced and queue sync completes, the broker status will change to RUNNING. To resolve a paused queue sync, you can also apply a policy to reduce the queue synchronization batch-size.

You can also define auto-delete and TTL policies to proactively reduce resource usage, as well as keep NACKs from consumers to a minimum. Requeueing messages on the broker is CPU-intensive so a high number of NACKs can affect broker performance.

Configure publisher confirmation and consumer delivery acknowledgement

The process of confirming a message has been sent to the broker is known as publisher confirmation. Publisher confirms let your application know when messages have been reliably stored. Publisher confirms can also help control the rate of messages stored to the broker. Without publisher confirms, there is no confirmation that a messgae is processed successfully, and your broker may drop messages it cannot process.

Similarly, when a client application sends confirmation of delivery and consumption of messages back to the broker, it is known as consumer delivery acknowledgment. Both confirmation and acknowledgement are essential to ensuring data safety when working with RabbitMQ brokers.

Consumer delivery acknowledgement is typically configured on the client application. When working with AMQP 0-9-1, acknowledgement can be enabled by configuring the basic.consume method. AMQP 0-9-1 clients can also configure publisher confirms by sending the confirm.select method.

Typically, delivery acknowledgement is enabled in a channel. For example, when working with the RabbitMQ Java client library, you can use the Channel#basicAck to set up a simple basic.ack positive acknowledgement as shown in the following example.

// this example assumes an existing channel instance boolean autoAck = false; channel.basicConsume(queueName, autoAck, "a-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); // positively acknowledge a single delivery, the message will // be discarded channel.basicAck(deliveryTag, false); } });
Note

Unacknowledged messages must be cached in memory. You can limit the number of messages that a consumer pre-fetches by configuring pre-fetch settings for a client application.

You can configure consumer_timeout to detect when consumers do not acknowledge deliveries. If the consumer does not send an acknowledgment within the timeout value, the channel will be closed, and you will recieve a PRECONDITION_FAILED. To diagnose the error, use the UpdateConfiguration API to increase the consumer_timeout value.

Configure pre-fetching

You can use the RabbitMQ pre-fetch value to optimize how your consumers consume messages. RabbitMQ implements the channel pre-fetch mechanism provided by AMQP 0-9-1 by applying the pre-fetch count to consumers as opposed to channels. The pre-fetch value is used to specify how many messages are being sent to the consumer at any given time. By default, RabbitMQ sets an unlimited buffer size for client applications.

There are a variety of factors to consider when setting a pre-fetch count for your RabbitMQ consumers. First, consider your consumers' environment and configuration. Because consumers need to keep all messages in memory as they are being processed, a high pre-fetch value can have a negative impact on your consumers' performance, and in some cases, can result in a consumer potentially crashing all together. Similarly, the RabbitMQ broker itself keeps all messages that it sends cached in memory until it recieves consumer acknowledgement. A high pre-fetch value can cause your RabbitMQ server to run out of memory quickly if automatic acknowledgement is not configured for consumers, and if consumers take a relatively long time to process messages.

With the above considerations in mind, we recommend always setting a pre-fetch value in order to prevent situations where a RabbitMQ broker or its consumers run out of memory due to a large number number of unprocessed, or unacknowledged messages. If you need to optimize your brokers to process large volumes of messages, you can test your brokers and consumers using a range of pre-fetch counts to determine the value at which point network overhead becomes largely insignificant compared to the time it takes a consumer to process messages.

Note
  • If your client applications have configured to automatically acknowledge delivery of messages to consumers, setting a pre-fetch value will have no effect.

  • All pre-fetched messages are removed from the queue.

The following example desmonstrate setting a pre-fetch value of 10 for a single consumer using the RabbitMQ Java client library.

ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.basicQos(10, false); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("my_queue", false, consumer);
Note

In the RabbitMQ Java client library, the default value for the global flag is set to false, so the above example can be written simply as channel.basicQos(10).

Configure Celery

Python Celery sends a lot of unnecessary messages that can make finding and processing the useful information harder. To reduce the noise and make processing easier, enter the following command:

celery -A app_name worker --without-heartbeat --without-gossip --without-mingle

Automatically recover from network failures

We recommend always enabling automatic network recovery to prevent significant downtime in cases where client connections to RabbitMQ nodes fail. The RabbitMQ Java client library supports automatic network recovery by default, beginning with version 4.0.0.

Automatic connection recovery is triggered if an unhandled exception is thrown in the connection's I/O loop, if a socket read operation timeout is detected, or if the server misses a heartbeat.

In cases where the initial connection between a client and a RabbitMQ node fails, automatic recovery will not be triggered. We recommend writing your application code to account for initial connection failures by retrying the connection. The following example demonstrates retrying initial network failures using the RabbitMQ Java client library.

ConnectionFactory factory = new ConnectionFactory(); // enable automatic recovery if using RabbitMQ Java client library prior to version 4.0.0. factory.setAutomaticRecoveryEnabled(true); // configure various connection settings try { Connection conn = factory.newConnection(); } catch (java.net.ConnectException e) { Thread.sleep(5000); // apply retry logic }
Note

If an application closes a connection by using the Connection.Close method, automatic network recovery will not be enabled or triggered.

Enable Classic Queue v2 for your RabbitMQ broker

We recommend enabling Classic Queue v2 (CQv2) on broker engine versions 3.10 and 3.11 for performance improvements including:

  • Decrease memory usage

  • Improve consumer delivery

  • Increase throughput for workloads where consumers keep up with producers

All Amazon MQ for RabbitMQ queues on 3.12.13 and above use CQv2 by default. To upgrade to the newest version of Amazon MQ for RabbitMQ, see Upgrading an Amazon MQ broker engine version.

Migrating from CQv1 to CQv2

To use CQv2, you must first enable the classic_mirrored_queue_version feature flag. For more information on feature flags, see How to enable feature flags.

To migrate from CQv1 to CQv2, you must create a new queue policy or edit an existing queue policy with the queue-version policy key definition set to 2. For more information on applying policies, see Applying policies to Amazon MQ for RabbitMQ. For more information on enabling CQv2 with a queue policy, see Classic Queues in the RabbitMQ documentation.

We recommend following our other best performance practices before starting the migration.

If you are using a queue policy, deleting the queue policy will downgrade CQv2 queues back to CQv1. We do not recommend downgrading CQv2 queues to CQv1 because RabbitMQ will convert the on-disk representation of the queue. This can be memory intensive and time-consuming for queues with high depth.