Amazon MQ for RabbitMQ 最佳實踐 - Amazon MQ

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Amazon MQ for RabbitMQ 最佳實踐

使用本節作為參考,快速找到在 Amazon MQ 上使用 RabbitMQ 代理程式時用於提升效能並降低輸送量成本的建議。

重要

Amazon MQ 不支援仲裁佇列。啟用仲裁佇列功能旗標並建立仲裁佇列會導致資料遺失。

重要

目前,Amazon MQ 不支援串流,也無法在 RabbitMQ 3.9.x 中推出的 JSON 中使用結構化日誌記錄。

重要

適用於 RabbitMQ 的 Amazon MQ 不支援使用者名稱「訪客」,而且會在您建立新的代理程式時刪除預設來賓帳戶。Amazon MQ 也會定期刪除任何客戶建立的名為「訪客」的帳戶。

啟用延遲佇列

如果您使用的是處理大量訊息的非常長的佇列,啟用延遲佇列可以改善代理程式效能。

RabbitMQ 的預設行為是快取記憶體中的訊息,而只有在代理程式需要更多可用記憶體時,將這些訊息移到磁碟。將訊息從記憶體移至磁碟需要時間,而且會停止訊息處理。延遲佇列會儘快將訊息儲存到磁碟,從而大幅加快記憶體到磁碟的處理速度,從而減少快取記憶體中的訊息。

您可在宣告時設定 queue.declare 引數,或透過 RabbitMQ 管理主控台設定政策,以啟用延遲佇列。以下範例示範如何使用 RabbitMQ Java 用戶端程式庫宣告延遲佇列。

Map<String, Object> args = new HashMap<String, Object>(); args.put("x-queue-mode", "lazy"); channel.queueDeclare("myqueue", false, false, false, args);

在 3.12.13 及更高版本上,所有適用於 RabbitMQ 佇列的 Amazon MQ 預設都表現為延遲佇列。若要升級到最新版本的 Amazon MQ 適用於 RabbitMQ,請參閱。升級 Amazon MQ 代理程式引擎版本

注意

啟用延遲佇列可以增加磁碟 I/O 操作。

使用持續且耐久的佇列

持續性訊息有助於防止代理程式當機或重新啟動時資料遺失。持續性訊息會在送達磁碟時立即寫入。然而,與延遲佇列不同,除非代理程式需要更多記憶體,否則持續性訊息會在記憶體和磁碟中快取。在需要更多記憶體的情況下,訊息由管理將訊息儲存到磁碟的 RabbitMQ 代理程式機制從記憶體中刪除,通常稱為持續性層

若要啟用訊息持續性,您可以將佇列宣告為 durable,並將訊息傳遞模式設定為 persistent。以下範例示範如何使用 RabbitMQ Java 用戶端程式庫宣告耐久的佇列。

boolean durable = true; channel.queueDeclare("my_queue", durable, false, false, null);

將佇列設定為耐久後,您可將 MessageProperties 設定為 PERSISTENT_TEXT_PLAIN (如以下範例所示),將持續性訊息傳送到您的佇列。

import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "my_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

讓佇列保持簡短

在叢集部署中,具有大量訊息的佇列可能會導致資源過度使用。當代理程式過度使用時,重新啟動 Amazon MQ for RabbitMQ 代理程式可能會導致效能進一步降低。若已重新啟動,過度使用的代理程式可能會在 REBOOT_IN_PROGRESS 狀態中變得沒有回應。

維護時段期間,Amazon MQ 會一次執行一個節點的所有維護工作,以確保代理程式維持運作狀態。因此,佇列可能需要在每個節點繼續操作時進行同步處理。在同步期間,需要複寫到鏡像的訊息會從對應的 Amazon Elastic Block Store (Amazon EBS) 磁碟區載入記憶體中,以便分批處理。分批處理訊息可讓佇列更快速地同步處理。

如果佇列保持簡短且訊息很小,則佇列會成功同步處理並繼續如預期般操作。不過,如果批次中的資料量接近節點的記憶體限制,節點就會引發高記憶體警示,暫停佇列同步。您可以比較中的RabbitMemUsedRabbitMqMemLimit Broker 節點測量結果,以確認記憶體使用狀況 CloudWatch。直到取用或刪除訊息或減少批次中的訊息數目,才能完成同步處理。

如果叢集部署暫停佇列同步處理,建議您取用或刪除訊息,以減少佇列中的訊息數目。一旦佇列深度減少且佇列同步完成,代理程式狀態就會變更為 RUNNING。若要解決暫停的佇列同步,您也可套用政策以降低佇列同步處理批次大小

警告

請勿重新啟動資源高漲的代理程式。

如果您在佇列同步處理暫停時重新啟動代理程式,代理程式會重新起始同步處理程序,這會在訊息從儲存體傳輸至節點記憶體時進一步降低代理程式資源,並導致代理程式在 REBOOT_IN_PROGRESS 狀態中變得沒有回應。

設定認可與確認

當用戶端應用程式將訊息的傳遞和取用確認傳送回代理程式時,這稱為消費者確認。同樣地,將確認傳送給發行者的程序也稱為發行者確認。認可和確認對於在使用 RabbitMQ 代理程式時確保資料安全至關重要。

消費者交付認可通常會設定於用戶端應用程式上。使用 AMQP 0-9-1 時,設定 basic.consume 或在使用 basic.code 方法提取訊息時,可以啟用認可。

通常,交付認可會在通道中啟用。例如,使用 RabbitMQ Java 用戶端程式庫時,您可使用 Channel#basicAck 來設定簡單的basic.ack 肯定認可,如以下範例所示。

// this example assumes an existing channel instance boolean autoAck = false; channel.basicConsume(queueName, autoAck, "a-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); // positively acknowledge a single delivery, the message will // be discarded channel.basicAck(deliveryTag, false); } });
注意

未認可的訊息必須在記憶體中快取。您可設定用戶端應用程式的預先擷取設定,以限制消費者預先擷取的訊息數量。

設定預先擷取

您可以使用 RabbitMQ 預先擷取值來最佳化消費者取用訊息的方式。RabbitMQ 可將預先擷取計數應用到取消費者而不是通道,以實作 AMQP 0-9-1 提供的通道預先擷取機制。預先擷取值用於指定在任何給定的時間傳送給消費者的訊息數量。根據預設,RabbitMQ 會為用戶端應用程式設定不受限的緩衝區大小。

為 RabbitMQ 消費者設定預先擷取計數時,需要考量各種因素。首先,請考量消費者的環境和組態。因為消費者需要在處理時讓所有訊息保留在記憶體中,因此高預先擷取值可能會對消費者的效能產生負面影響,在某些情況下,可能會導致消費者可能一起當機。同樣地,RabbitMQ 代理程式本身會讓其傳送的所有訊息保持在記憶體中快取,直到其收到消費者認可為止。如果沒有為消費者設定自動認可,且消費者花相對長的時間來處理訊息,則高預先擷取值可能會導致 RabbitMQ 伺服器快速耗盡記憶體。

考慮到上述考量,我們建議一律設定預先擷取值,以防止 RabbitMQ 代理程式或其消費者因大量未處理或未認可的訊息而耗盡記憶體的情況。如果您需要將代理程式最佳化以處理大量訊息,您可使用一系列預先擷取計數來測試代理程式和消費者,以判斷相較於消費者處理訊息所需的時間,網路負荷在哪個時候變得非常微不足道的值。

注意
  • 如果用戶端應用程式已設定為自動認可訊息傳遞給消費者,則設定預先擷取值沒有作用。

  • 所有預先擷取的訊息會從佇列中移除。

以下範例示範如何使用 RabbitMQ Java 用戶端程式庫,為單一消費者設定 10 的預先擷取值。

ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.basicQos(10, false); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("my_queue", false, consumer);
注意

在 RabbitMQ Java 用戶端程式庫中,global 旗標的預設值會設為 false,所以上述範例可以簡單地撰寫為 channel.basicQos(10)

設定 Celery

Python Celery 會傳送許多不必要的訊息,使搜尋與處理實用資訊變得更加困難。若要減少雜訊並使處理更加輕鬆,請輸入下列命令:

celery -A app_name worker --without-heartbeat --without-gossip --without-mingle

從網路故障中自動復原

我們建議一律啟用自動網路復原,以避免在用戶端連線到 RabbitMQ 節點失敗的情況下發生重大停機。從版本 4.0.0 開始,RabbitMQ Java 用戶端程式庫預設支援自動網路復原。

如果連線的 I/O 迴圈中擲回未處理的例外狀況、如果偵測到通訊端讀取作業逾時,或者伺服器遺漏活動訊號,就會觸發自動連線復原。

在用戶端與 RabbitMQ 節點之間的初始連線失敗的情況下,將不會觸發自動復原。我們建議您透過重試連線來撰寫應用程式程式碼,以解決初始連線失敗的問題。以下範例示範如何使用 RabbitMQ Java 用戶端程式庫重試初始網路故障。

ConnectionFactory factory = new ConnectionFactory(); // enable automatic recovery if using RabbitMQ Java client library prior to version 4.0.0. factory.setAutomaticRecoveryEnabled(true); // configure various connection settings try { Connection conn = factory.newConnection(); } catch (java.net.ConnectException e) { Thread.sleep(5000); // apply retry logic }
注意

如果應用程式使用 Connection.Close 方法關閉連線,則不會啟用或觸發自動網路復原。

為您的 RabbitMQ 代理程式啟用傳統佇列 v2

我們建議在代理程式引擎版本 3.10 和 3.11 上啟用傳統佇列 v2 (CQv2),以改善效能,包括:

  • 減少記憶體使用

  • 提升消費者交付

  • 增加工作負載輸送量,讓消費者與生產者保持聯繫

在 3.12.13 及更新版本上,所有適用於 RabbitMQ 佇列的 Amazon MQ 預設都使用 CQv2。若要升級到最新版本的 Amazon MQ 適用於 RabbitMQ,請參閱。升級 Amazon MQ 代理程式引擎版本

從 CQv1 移轉至 CQv2

若要使用 CQv2,您必須先啟用classic_mirrored_queue_version功能旗標。如需有關功能旗標的詳細資訊,請參閱如何啟用功能旗標

若要從 CQv1 移轉至 CQv2,您必須在原則金鑰定義設為的情況下,建立新的佇列原則或編輯現有的佇列queue-version原則。2如需套用原則的詳細資訊,請參閱政策。如需使用佇列政策啟用 CQv2 的詳細資訊,請參閱 RabbitMQ 文件中的傳統佇列

我們建議您在開始遷移之前遵循其他最佳效能實務

如果您使用佇列政策,刪除佇列政策會將 CQv2 佇列降級回 CQv1。我們不建議將 CQv2 佇列降級為 CQv1,因為 RabbitMQ 會轉換佇列的磁碟上表示法。對於高深度的佇列,這可能會佔用大量記憶體且耗時。