Amazon MQ for RabbitMQ のベストプラクティス
このセクションは、Amazon MQ での RabbitMQ ブローカーの使用時にパフォーマンスを最大限に引き出し、スループットコストを最小限に抑えるための推奨事項をすばやく見つけるために使用してください。
重要
現在、Amazon MQ はストリーム
重要
Amazon MQ for RabbitMQ では、ユーザー名「guest」はサポートされず、デフォルトのゲストアカウントは新しいブローカーの作成時に削除されます。ユーザーが作成した「guest」というアカウントも、Amazon MQ によって定期的に削除されます。
トピック
自動マイナーバージョンアップグレードを有効にする
最新のブローカーバージョンを使用することで、セキュリティ修正とバグ修正が適用され、パフォーマンスが向上します。Amazon MQ の自動マイナーバージョンアップグレードを有効にすると、最新のパッチバージョンへのアップグレードを管理できます。
非推奨の機能を使用している場合
Amazon MQ で RabbitMQ バージョン 3.13 を使用している場合は、RabbitMQ 管理 UI に Deprecated features are being used.
というバナーが表示されます。
これは、RabbitMQ で提供されなくなった以下の機能が Amazon MQ の RabbitMQ で使用されているか、Amazon MQ の RabbitMQ に自動的に設定されているためです。
-
従来のキューミラーリング
-
グローバル QoS
-
非排他的な一時キュー
これはバージョン 3.13 の情報バナーであり、対応は必要ありません。Amazon MQ ブローカーは、引き続きこれらの機能を使用します。
最高のスループットのために正しいブローカーインスタンスタイプを選択する
ブローカーインスタンスタイプのメッセージスループットは、アプリケーションのユースケースに依存します。t3.micro
などの小さいブローカーインスタンスタイプは、アプリケーションのパフォーマンスをテストする場合にのみ使用することをお勧めします。大規模なインスタンスを本番環境で使用する前にこれらのマイクロインスタンスを使用すると、アプリケーションのパフォーマンスが向上し、開発コストを抑えることができます。m5.large
以上のインスタンスタイプでは、クラスターデプロイを使用して高可用性とメッセージの耐久性を確保できます。大きいブローカーインスタンスタイプは、本番稼働レベルのクライアントとキュー、高スループット、メモリ内のメッセージ、冗長メッセージを処理できます。適切なインスタンスタイプを選択する方法の詳細については、「サイズ設定ガイドライン」を参照してください。
複数のチャネルを使用する
接続チャーンを回避するには、1 つの接続で複数のチャネルを使用します。アプリケーションでは、チャネルに対する 1:1 の接続を避ける必要があります。プロセスごとに 1 つの接続を使用し、スレッドごとに 1 つのチャネルを使用することをお勧めします。チャネルのリークを防ぐために、チャネルを過剰に使用することは避けてください。
レイジーキューを有効にする
大量のメッセージを処理する非常に長いキューを使用している場合は、レイジーキューを有効にすることでブローカーのパフォーマンスが向上する可能性があります。
RabbitMQ のデフォルト動作では、メッセージをメモリにキャッシュし、ブローカーでより多くの使用可能なメモリが必要となった場合にのみ、それらをディスクに移動します。メモリからディスクへのメッセージの移動には時間がかかるため、メッセージ処理が停止します。レイジーキューはメッセージをディスクに可能な限り早急に保存し、その結果としてメモリにキャッシュされるメッセージが少なくなります。これにより、メモリからディスクへの移動処理が大幅に高速化されます。
レイジーキューは、宣言時に queue.declare
引数を設定する、または RabbitMQ のマネジメントコンソールでポリシーを設定することによって有効にできます。以下の例は、RabbitMQ Java クライアントライブラリを使用したレイジーキューの宣言を示しています。
Map<String, Object> args = new HashMap<String, Object>(); args.put("x-queue-mode", "lazy"); channel.queueDeclare("myqueue", false, false, false, args);
3.12.13 以降のすべての Amazon MQ for RabbitMQ キューは、デフォルトでレイジーキューとして動作します。Amazon MQ for RabbitMQ の最新バージョンにアップグレードするには、「Amazon MQ ブローカーエンジンバージョンのアップグレード」を参照してください。
注記
レイジーキューを有効にすると、ディスク I/O 操作が増加する場合があります。
永続メッセージと持続キューを使用する
永続メッセージは、ブローカーがクラッシュまたは再起動するという状況におけるデータ損失の防止に役立ちます。永続メッセージは、到着するとすぐにディスクに書き込まれますが、レイジーキューとは異なり、ブローカーがより多くのメモリを必要とする場合を除き、永続メッセージはメモリとディスクの両方にキャッシュされます。より多くのメモリが必要な場合は、ディスクへのメッセージの保存を管理する RabbitMQ ブローカーメカニズム (一般に永続レイヤーと呼ばれます) によって、メモリからメッセージが削除されます。
メッセージの永続性を有効にするには、キューを durable
として宣言し、メッセージ配信モードを persistent
に設定できます。以下の例は、RabbitMQ Java クライアントライブラリ
boolean durable = true; channel.queueDeclare("my_queue", durable, false, false, null);
キューを持続キューとして設定したら、以下の例にあるように、MessageProperties
を PERSISTENT_TEXT_PLAIN
に設定することによって永続メッセージをキューに送信できます。
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "my_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
キューを短くしておく
クラスターデプロイでは、多数のメッセージを持つキューがリソースの過剰な使用につながる場合があります。ブローカーが過剰に使用されているときは、Amazon MQ for RabbitMQ ブローカーの再起動がパフォーマンスをさらに低下させる原因となる可能性があります。過剰に使用されているブローカーが再起動されると、REBOOT_IN_PROGRESS
状態のまま応答しなくなることがあります。
Amazon MQ はメンテナンスウィンドウ中、すべてのメンテナンス作業を一度に 1 ノードずつ実行して、ブローカーが動作可能な状態を維持することを確実にします。その結果、各ノードが操作を再開するときに、キューが同期する必要が生じる場合があります。同期中、ミラーにレプリケートする必要があるメッセージは、バッチで処理されるように、対応する Amazon Elastic Block Store (Amazon EBS) ボリュームからメモリにロードされます。メッセージをバッチで処理することにより、キューの同期が速くなります。
キューを短くし、メッセージを小さくしておくと、キューが正常に同期し、期待通りに操作を再開します。ただし、バッチ内のデータ量がノードのメモリ制限に近づいた場合は、ノードが高メモリアラームを発し、キューの同期を一時停止します。メモリ使用量は、CloudWatch で RabbitMemUsed および RabbitMqMemLimit のブローカーノードメトリクスを比較することで確認できます。同期は、メッセージが消費もしくは削除される、またはバッチ内のメッセージの数が減るまで完了できません。
クラスターデプロイのためにキューの同期化が一時停止される場合は、メッセージを消費または削除して、キュー内のメッセージの数を減らすことをお勧めします。キュー深度が減少し、キューの同期が完了すると、ブローカーのステータスが RUNNING
に変更されます。一時停止されたキューの同期を解決するには、キューの同期のバッチサイズを小さくするポリシーを適用することも可能です。
また、自動削除ポリシーと TTL ポリシーを定義すると、リソースの使用量をプロアクティブに削減するとともに、コンシューマーからの NACK を最小限に抑えることができます。ブローカーへのメッセージの再キュー処理は CPU 負荷が高いため、大量の NACK が発生するとブローカーのパフォーマンスに影響する可能性があります。
パブリッシャーの確認とコンシューマーの配信承認の設定
ブローカーにメッセージが送信されたことを確認するプロセスは、パブリッシャーの確認と呼ばれます。パブリッシャーは、メッセージが確実に格納されたときにアプリケーションに通知します。パブリッシャーの確認は、ブローカーに格納されるメッセージの割合を制御するためにも役立ちます。パブリッシャーの確認がないと、メッセージが正常に処理されたという確認が得られないため、ブローカーは処理できないメッセージをドロップする可能性があります。
同様に、クライアントアプリケーションはメッセージの配信と消費の確認をブローカーに返送します。これはコンシューマーの配信承認と呼ばれます。RabbitMQ ブローカーの使用時にデータの安全性を確保するには、確認と承認の両方が不可欠です。
コンシューマーの配信承認は、通常クライアントアプリケーションで設定されています。AMQP 0-9-1 を使用している場合は、basic.consume
メソッドを設定することで承認を有効化できます。AMQP 0-9-1 クライアントでは、confirm.select
メソッドを送信してパブリッシャーの確認を設定することもできます。
通常、配信承認はチャネルで有効化されます。例えば、RabbitMQ Java クライアントライブラリの使用時には、以下の例にあるように、Channel#basicAck
を使用してシンプルな basic.ack
肯定承認をセットアップできます。
// this example assumes an existing channel instance boolean autoAck = false; channel.basicConsume(queueName, autoAck, "a-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); // positively acknowledge a single delivery, the message will // be discarded channel.basicAck(deliveryTag, false); } });
注記
未承認メッセージは、メモリにキャッシュする必要があります。コンシューマーがプリフェッチするメッセージの数は、クライアントアプリケーションのプリフェッチを設定することによって制限できます。
consumer_timeout
を設定すると、コンシューマーから配信承認が届かない状況を検出できます。コンシューマーがタイムアウト値の時間内に承認を送信しない場合、チャネルは閉じられ、PRECONDITION_FAILED
が発生します。エラーを診断するには、UpdateConfiguration API を使用して consumer_timeout
値を大きくします。
プリフェッチを設定する
RabbitMQ のプリフェッチ値を使用して、コンシューマーがメッセージを消費する方法を最適化できます。RabbitMQ は、プリフェッチ数をチャネルではなくコンシューマーに適用することによって、AMQP 0-9-1 が提供するチャネルプリフェッチメカニズムを実装します。プリフェッチ値は、特定の時間にコンシューマに送信されるメッセージの数を指定するために使用されます。デフォルトで、RabbitMQ はクライアントアプリケーションに無制限のバッファサイズを設定します。
RabbitMQ コンシューマーにプリフェッチ数を設定するときに考慮する要因にはさまざまなものがあります。まず、コンシューマーの環境と設定を考慮します。コンシューマーは、メッセージが処理されるときにそれらすべてをメモリに保持する必要があるため、高いプリフェッチ値はコンシューマーのパフォーマンスに悪影響を及ぼし、場合によってはコンシューマー全体がクラッシュする原因になることもあります。同様に、RabbitMQ ブローカー自体も、コンシューマー承認を受け取るまで、送信するすべてのメッセージをメモリにキャッシュしておきます。コンシューマに自動承認が設定されておらず、コンシューマによるメッセージの処理に比較的長い時間がかかる場合、高いプリフェッチ値は RabbitMQ サーバーのメモリがすぐになくなる原因になる可能性があります。
上記の考慮事項を踏まえて、大量の未処理または未承認のメッセージが原因で RabbitMQ ブローカー、またはそのコンシューマーでメモリ不足が発生する状況を防ぐため、常にプリフェッチ値を設定することが推奨されます。大量のメッセージを処理するためにブローカーを最適化する必要がある場合は、さまざまなプリフェッチ数を使用してブローカーとコンシューマーをテストし、コンシューマーがメッセージを処理するためにかかる時間と比較して、ネットワークオーバーヘッドがおおむね軽微なものになる値を判断します。
注記
コンシューマーへのメッセージの配信を自動承認するようにクライアントアプリケーションが設定されている場合、プリフェッチ値を設定しても効果はありません。
プリフェッチされたメッセージはすべて、キューから削除されます。
以下の例は、RabbitMQ Java クライアントライブラリを使用した単一のコンシューマーへのプリフェッチ値 10
の設定を示しています。
ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.basicQos(10, false); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("my_queue", false, consumer);
注記
RabbitMQ Java クライアントライブラリでは、global
フラグのデフォルト値が false
に設定されているので、上記の例は単純に channel.basicQos(10)
として記述できます。
Celery を設定
Python Celery は、有用な情報の検索と処理をより困難にする多くの不要なメッセージを送信します。ノイズを減らして処理を容易にするには、次のコマンドを入力します。
celery -A app_name worker --without-heartbeat --without-gossip --without-mingle
ネットワーク障害から自動的に回復する
RabbitMQ ノードへのクライアント接続が失敗した場合の大幅なダウンタイムを防ぐため、自動ネットワークリカバリを常に有効にしておくことをお勧めします。バージョン 4.0.0
以降の RabbitMQ Java クライアントライブラリは、自動ネットワークリカバリをデフォルトでサポートします。
自動接続リカバリは、接続の I/O ループで未処理の例外がスローされた場合、ソケット読み取り操作のタイムアウトが検出された場合、またはサーバーがハートビート
クライアントと RabbitMQ ノード間の初期接続が失敗した場合、自動リカバリはトリガーされません。アプリケーションコードは、接続の再試行によって、初期接続障害を考慮するように記述することをお勧めします。以下の例は、RabbitMQ Java クライアントライブラリを使用した初期ネットワーク障害の再試行を示しています。
ConnectionFactory factory = new ConnectionFactory(); // enable automatic recovery if using RabbitMQ Java client library prior to version 4.0.0. factory.setAutomaticRecoveryEnabled(true); // configure various connection settings try { Connection conn = factory.newConnection(); } catch (java.net.ConnectException e) { Thread.sleep(5000); // apply retry logic }
注記
アプリケーションが Connection.Close
メソッド使用して接続を閉じる場合、自動ネットワークリカバリは有効化またはトリガーされません。
RabbitMQ ブローカーの Classic Queue v2 を有効にする
ブローカーエンジンバージョン 3.10 および 3.11 では、次のようなパフォーマンス向上のために Classic Queue v2 (CQv2) を有効にすることをお勧めします。
-
メモリ使用量を減らす
-
コンシューマーへの配信を改善する
-
コンシューマーがプロデューサーに遅れずに対応するワークロードのスループットを向上させる
3.12.13 以降のすべての Amazon MQ for RabbitMQ キューは、デフォルトで CQv2 を使用します。Amazon MQ for RabbitMQ の最新バージョンにアップグレードするには、「Amazon MQ ブローカーエンジンバージョンのアップグレード」を参照してください。
CQv1 から CQv2 への移行
CQv2 を使用するには、まず classic_mirrored_queue_version
機能フラグを有効にする必要があります。機能フラグの詳細については、「How to enable feature flags
CQv1 から CQv2 に移行するには、新しいキューポリシーを作成するか、既存のキューポリシーを編集して、queue-version
ポリシーキー定義を 2
に設定する必要があります。ポリシーの適用の詳細については、「Amazon MQ for RabbitMQ へのポリシーの適用」を参照してください。キューポリシーで CQv2 を有効にする方法の詳細については、RabbitMQ ドキュメントの「Classic Queues
移行を開始する前に、他のパフォーマンスに関するベストプラクティスに従うことをおすすめします。
キューポリシーを使用している場合は、キューポリシーを削除すると CQv2 キューが CQv1 にダウングレードされます。CQv2 キューを CQv1 にダウングレードすることはお勧めしません。RabbitMQ はキューのディスク上の表現を変換するためです。キューの深さが深い場合、これはメモリを大量に消費し、時間がかかる可能性があります。