Amazon MQ for RabbitMQ のベストプラクティス - Amazon MQ

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon MQ for RabbitMQ のベストプラクティス

このセクションは、Amazon MQ での RabbitMQ ブローカーの使用時にパフォーマンスを最大限に引き出し、スループットコストを最小限に抑えるための推奨事項をすばやく見つけるために使用してください。

重要

Amazon MQ はクォーラムキューをサポートしません。クォーラムキュー機能フラグの有効化とクォーラムキューの作成は、データ損失の原因になります。

重要

現在、Amazon MQ はストリームや、RabbitMQ 3.9.x で導入された JSON での構造化ロギングの使用をサポートしていません。

重要

Amazon MQ for RabbitMQ はユーザー名「ゲスト」をサポートしておらず、新しいブローカーを作成するとデフォルトのゲストアカウントが削除されます。Amazon MQ は、お客様が作成した「ゲスト」というアカウントも定期的に削除します。

レイジーキューを有効にする

大量のメッセージを処理する非常に長いキューを使用している場合、レイジーキューを有効にするとブローカーのパフォーマンスが向上します。

RabbitMQ のデフォルト動作では、メッセージをメモリにキャッシュし、ブローカーでより多くの使用可能なメモリが必要となった場合にのみ、それらをディスクに移動します。メモリからディスクへのメッセージの移動には時間がかかり、メッセージ処理が停止します。レイジーキューは、メッセージをできるだけ早くディスクに保存することで、メモリからディスクへのプロセスを大幅に高速化し、メモリにキャッシュされるメッセージを減らします。

レイジーキューは、宣言時に queue.declare 引数を設定する、または RabbitMQ のマネジメントコンソールでポリシーを設定することによって有効にできます。以下の例は、RabbitMQ Java クライアントライブラリを使用したレイジーキューの宣言を示しています。

Map<String, Object> args = new HashMap<String, Object>(); args.put("x-queue-mode", "lazy"); channel.queueDeclare("myqueue", false, false, false, args);

3.12.13 以降のすべての Amazon MQ for RabbitMQ キューは、デフォルトでレイジーキューとして動作します。Amazon MQ for RabbitMQ の最新バージョンにアップグレードするには、「」を参照してくださいAmazon MQ ブローカーエンジンバージョンのアップグレード

注記

レイジーキューを有効にすると、ディスク I/O 操作が増加する場合があります。

永続キューと持続キューを使用する

永続メッセージは、ブローカーがクラッシュまたは再起動するという状況におけるデータ損失の防止に役立ちます。永続メッセージは、到着するとすぐにディスクに書き込まれますが、レイジーキューとは異なり、ブローカーがより多くのメモリを必要とする場合を除き、永続メッセージはメモリとディスクの両方にキャッシュされます。より多くのメモリが必要な場合は、ディスクへのメッセージの保存を管理する RabbitMQ ブローカーメカニズム (一般に永続レイヤーと呼ばれます) によって、メモリからメッセージが削除されます。

メッセージの永続性を有効にするには、キューを durable として宣言し、メッセージ配信モードを persistent に設定できます。以下の例は、RabbitMQ Java クライアントライブラリを使用した持続キューの宣言を示しています。

boolean durable = true; channel.queueDeclare("my_queue", durable, false, false, null);

キューを持続キューとして設定したら、以下の例にあるように、MessagePropertiesPERSISTENT_TEXT_PLAIN に設定することによって永続メッセージをキューに送信できます。

import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "my_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

キューを短くしておく

クラスターデプロイでは、多数のメッセージを持つキューがリソースの過剰な使用につながる場合があります。ブローカーが過剰に使用されているときは、Amazon MQ for RabbitMQ ブローカーの再起動がパフォーマンスをさらに低下させる原因となる可能性があります。過剰に使用されているブローカーが再起動されると、REBOOT_IN_PROGRESS 状態のまま応答しなくなることがあります。

Amazon MQ はメンテナンスウィンドウ中、すべてのメンテナンス作業を一度に 1 ノードずつ実行して、ブローカーが動作可能な状態を維持することを確実にします。その結果、各ノードが操作を再開するときに、キューが同期する必要が生じる場合があります。同期中、ミラーにレプリケートする必要があるメッセージは、バッチで処理されるように、対応する Amazon Elastic Block Store (Amazon EBS) ボリュームからメモリにロードされます。メッセージをバッチで処理することにより、キューの同期が速くなります。

キューを短くし、メッセージを小さくしておくと、キューが正常に同期し、期待通りに操作を再開します。ただし、バッチ内のデータ量がノードのメモリ制限に近づいた場合は、ノードが高メモリアラームを発し、キューの同期を一時停止します。で RabbitMemUsedRabbitMqMemLimitブローカーノードのメトリクス CloudWatchを比較することで、メモリ使用量を確認できます。同期は、メッセージが消費もしくは削除される、またはバッチ内のメッセージの数が減るまで完了できません。

クラスターデプロイのためにキューの同期化が一時停止される場合は、メッセージを消費または削除して、キュー内のメッセージの数を減らすことをお勧めします。キュー深度が減少し、キューの同期が完了すると、ブローカーのステータスが RUNNING に変更されます。一時停止されたキューの同期を解決するには、キューの同期のバッチサイズを小さくするポリシーを適用することも可能です。

警告

多くのリソースを使用して実行されているブローカーは再起動しないでください。

キューの同期が一時停止しているときにブローカーを再起動すると、ブローカーは同期プロセスを再開します。これにより、メッセージがストレージからノードメモリに転送されるため、ブローカーリソースがさらに低下し、その結果、ブローカーが REBOOT_IN_PROGRESS 状態のまま応答しなくなる可能性があります。

承認と確認を設定する

クライアントアプリケーションによるメッセージの配信確認と消費確認のブローカーへの返送は、コンシューマー承認として知られています。同様に、パブリッシャーに確認を送信するプロセスはパブリッシャー確認として知られています。RabbitMQ ブローカーの使用時におけるデータの安全性を確実にするには、承認と確認の両方が不可欠です。

コンシューマーの配信承認は、通常クライアントアプリケーションで設定されています。AMQP 0-9-1 を使用する場合、承認は basic.consume を設定して有効にする、または basic.code メソッドを使用してメッセージを取得するときに有効にすることができます。

通常、配信承認はチャネルで有効化されます。例えば、RabbitMQ Java クライアントライブラリの使用時には、以下の例にあるように、Channel#basicAck を使用してシンプルな basic.ack 肯定承認をセットアップできます。

// this example assumes an existing channel instance boolean autoAck = false; channel.basicConsume(queueName, autoAck, "a-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); // positively acknowledge a single delivery, the message will // be discarded channel.basicAck(deliveryTag, false); } });
注記

未承認メッセージは、メモリにキャッシュする必要があります。コンシューマーがプリフェッチするメッセージの数は、クライアントアプリケーションのプリフェッチを設定することによって制限できます。

プリフェッチを設定する

RabbitMQ のプリフェッチ値を使用して、コンシューマーがメッセージを消費する方法を最適化できます。RabbitMQ は、プリフェッチ数をチャネルではなくコンシューマーに適用することによって、AMQP 0-9-1 が提供するチャネルプリフェッチメカニズムを実装します。プリフェッチ値は、特定の時間にコンシューマに送信されるメッセージの数を指定するために使用されます。デフォルトで、RabbitMQ はクライアントアプリケーションに無制限のバッファサイズを設定します。

RabbitMQ コンシューマーにプリフェッチ数を設定するときに考慮する要因にはさまざまなものがあります。まず、コンシューマーの環境と設定を考慮します。コンシューマーは、メッセージが処理されるときにそれらすべてをメモリに保持する必要があるため、高いプリフェッチ値はコンシューマーのパフォーマンスに悪影響を及ぼし、場合によってはコンシューマー全体がクラッシュする原因になることもあります。同様に、RabbitMQ ブローカー自体も、コンシューマー承認を受け取るまで、送信するすべてのメッセージをメモリにキャッシュしておきます。コンシューマに自動承認が設定されておらず、コンシューマによるメッセージの処理に比較的長い時間がかかる場合、高いプリフェッチ値は RabbitMQ サーバーのメモリがすぐになくなる原因になる可能性があります。

上記の考慮事項を踏まえて、大量の未処理または未承認のメッセージが原因で RabbitMQ ブローカー、またはそのコンシューマーでメモリ不足が発生する状況を防ぐため、常にプリフェッチ値を設定することが推奨されます。大量のメッセージを処理するためにブローカーを最適化する必要がある場合は、さまざまなプリフェッチ数を使用してブローカーとコンシューマーをテストし、コンシューマーがメッセージを処理するためにかかる時間と比較して、ネットワークオーバーヘッドがおおむね軽微なものになる値を判断します。

注記
  • コンシューマーへのメッセージの配信を自動承認するようにクライアントアプリケーションが設定されている場合、プリフェッチ値を設定しても効果はありません。

  • プリフェッチされたメッセージはすべて、キューから削除されます。

以下の例は、RabbitMQ Java クライアントライブラリを使用した単一のコンシューマーへのプリフェッチ値 10 の設定を示しています。

ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.basicQos(10, false); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("my_queue", false, consumer);
注記

RabbitMQ Java クライアントライブラリでは、global フラグのデフォルト値が false に設定されているので、上記の例は単純に channel.basicQos(10) として記述できます。

Celery を設定

Python Celery は、有用な情報の検索と処理をより困難にする多くの不要なメッセージを送信します。ノイズを減らして処理を容易にするには、次のコマンドを入力します。

celery -A app_name worker --without-heartbeat --without-gossip --without-mingle

ネットワーク障害から自動的に回復する

RabbitMQ ノードへのクライアント接続が失敗した場合の大幅なダウンタイムを防ぐため、自動ネットワークリカバリを常に有効にしておくことをお勧めします。バージョン 4.0.0 以降の RabbitMQ Java クライアントライブラリは、自動ネットワークリカバリをデフォルトでサポートします。

自動接続リカバリは、接続の I/O ループで未処理の例外がスローされた場合、ソケット読み取り操作のタイムアウトが検出された場合、またはサーバーがハートビートを受信しない場合にトリガーされます。

クライアントと RabbitMQ ノード間の初期接続が失敗した場合、自動リカバリはトリガーされません。アプリケーションコードは、接続の再試行によって、初期接続障害を考慮するように記述することをお勧めします。以下の例は、RabbitMQ Java クライアントライブラリを使用した初期ネットワーク障害の再試行を示しています。

ConnectionFactory factory = new ConnectionFactory(); // enable automatic recovery if using RabbitMQ Java client library prior to version 4.0.0. factory.setAutomaticRecoveryEnabled(true); // configure various connection settings try { Connection conn = factory.newConnection(); } catch (java.net.ConnectException e) { Thread.sleep(5000); // apply retry logic }
注記

アプリケーションが Connection.Close メソッド使用して接続を閉じる場合、自動ネットワークリカバリは有効化またはトリガーされません。

RabbitMQ ブローカーの Classic Queue v2 を有効にする

ブローカーエンジンバージョン 3.10 および 3.11 で Classic Queue v2 (CQv2) を有効にして、次のようなパフォーマンスを向上させることをお勧めします。

  • メモリ使用量を減らす

  • コンシューマーへの配信を改善する

  • コンシューマーがプロデューサーに遅れずに対応するワークロードのスループットを向上させる

3.12.13 以降のすべての Amazon MQ for RabbitMQ キューは、デフォルトで CQv2 を使用します。Amazon MQ for RabbitMQ の最新バージョンにアップグレードするには、「」を参照してくださいAmazon MQ ブローカーエンジンバージョンのアップグレード

CQv1 から CQv2 への移行

CQv2 を使用するには、まずclassic_mirrored_queue_version機能フラグを有効にする必要があります。機能フラグの詳細については、「機能フラグを有効にする方法」を参照してください。

CQv1 から CQv2 に移行するには、新しいキューポリシーを作成するか、ポリシーキー定義を に設定して既存のキューqueue-versionポリシーを編集する必要があります2。ポリシーの適用の詳細については、「」を参照してくださいポリシー。キューポリシーで CQv2 を有効にする方法の詳細については、RabbitMQ ドキュメントの「Classic Queues」を参照してください。

移行を開始する前に、他のパフォーマンスに関するベストプラクティスに従うことをおすすめします。

キューポリシーを使用している場合は、キューポリシーを削除すると CQv2 キューが CQv1 にダウングレードされます。CQv2 キューを CQv1 にダウングレードすることはお勧めしません。RabbitMQ はキューのディスク上の表現を変換するためです。キューの深さが深い場合、これはメモリを大量に消費し、時間がかかる可能性があります。