RabbitMQ の一時停止されたキュー同期の解決 - Amazon MQ

RabbitMQ の一時停止されたキュー同期の解決

Amazon MQ for RabbitMQ クラスターデプロイでは、各キューに発行されたメッセージが 3 つのブローカーノード全体にレプリケートされます。ミラーリングと呼ばれるこのレプリケーションは、RabbitMQ ブローカーに高可用性 (HA) を提供します。クラスターデプロイ内のキューは、1 つのノード上にあるメインレプリカと、1 つ、または複数のミラーで構成されています。ミラーキューに適用されるすべての操作 (メッセージのキュー登録など) は、まずメインキューに適用され、その後ミラー全体にレプリケートされます。

例えば、メインノード (main) と 2 つのミラー (mirror-1 および mirror-2) の 3 つのノード全体にレプリケートされたミラーキューについて考えてみましょう。このミラーキュー内のすべてのメッセージがすべてのミラーに正常に伝播されると、キューが同期されたことになります。ノード (mirror-1) が一定期間使用できなくなった場合でも、キューは引き続き動作可能で、メッセージのキュー登録を継続できますが、キューを同期するには、mirror-1 が使用不可である間に main に発行されたメッセージが mirror-1 にレプリケートされる必要があります。

ミラーリングの詳細については、RabbitMQ ウェブサイトで「Classic Mirrored Queues」を参照してください。

メンテナンスとキューの同期

メンテナンスウィンドウ中、Amazon MQ はすべてのメンテナンス作業を一度に 1 ノードずつ実行して、ブローカーが動作可能な状態を維持することを確実にします。その結果、各ノードが操作を再開するときに、キューが同期する必要が生じる場合があります。同期中、ミラーにレプリケートする必要があるメッセージは、バッチで処理されるように、対応する Amazon Elastic Block Store (Amazon EBS) ボリュームからメモリにロードされます。メッセージをバッチで処理することにより、キューの同期が速くなります。

キューを短くし、メッセージを小さくしておくと、キューが正常に同期し、期待通りに操作を再開します。ただし、バッチ内のデータ量がノードのメモリ制限に近づいた場合は、ノードが高メモリアラームを発し、キューの同期を一時停止します。メモリ使用量は、CloudWatch で RabbitMemUsed および RabbitMqMemLimit のブローカーノードメトリクスを比較することで確認できます。同期は、メッセージが消費もしくは削除される、またはバッチ内のメッセージの数が減るまで完了できません。

注記

キューの同期のバッチサイズを小さくすると、レプリケーショントランザクション数の増加につながる可能性があります。

一時停止されたキューの同期を解決するには、ha-sync-batch-size ポリシーの適用とキューの同期の再開について説明する、このチュートリアルのステップに従ってください。

前提条件

このチュートリアルには、管理者権限を持つ Amazon MQ for RabbitMQ ブローカーユーザーが必要です。ブローカーを初めて作成したときに作成された管理者ユーザー、またはその後で作成した別のユーザーを使用できます。以下の表は、正規表現 (regexp) パターンとしての必要な管理者ユーザータグと許可です。

タグ 読み込み regexp 設定 regexp 書き込み regexp
administrator .* .* .*

RabbitMQ ユーザーの作成、およびユーザータグと許可の管理の詳細については、「Amazon MQ for RabbitMQ ブローカーのユーザー」を参照してください。

ステップ 1: ha-sync-batch-size ポリシーを適用する

以下の手順では、ブローカーで作成されたすべてのキューに適用されるポリシーの追加について説明します。RabbitMQ ウェブコンソールまたは RabbitMQ Management API を使用できます。詳細については、RabbitMQ ウェブサイトの「Management Plugin」を参照してください。

RabbitMQ ウェブコンソールを使用して ha-sync-batch-size ポリシーを適用する
  1. Amazon MQ コンソールにサインインします。

  2. 左側のナビゲーションペインで [Brokers] (ブローカー) をクリックします。

  3. ブローカーのリストから、新しいポリシーを適用するブローカーの名前を選択します。

  4. ブローカーのページの [Connections] (接続) セクションで、RabbitMQ ウェブコンソール URL をメモします。新しいブラウザタブまたはウィンドウに RabbitMQ ウェブコンソールが開きます。

  5. ブローカー管理者のサインイン認証情報を使用して RabbitMQ ウェブコンソールにログインします。

  6. RabbitMQ ウェブコンソールのページ上部で、[Admin] (管理) をクリックします。

  7. [Admin] (管理) ページの右側にあるナビゲーションペインで [Policies] (ポリシー) をクリックします。

  8. [Policies] (ポリシー) ページに、ブローカーの現在の [User policies] (ユーザーポリシー) が表示されます。[User policies] (ユーザーポリシー) の下で、[Add / update a policy] (ポリシーの追加/更新) を展開します。

    注記

    デフォルトで、Amazon MQ for RabbitMQ クラスターは、ha-all-AWS-OWNED-DO-NOT-DELETE という名前の初期ブローカーポリシーを使用して作成されます。Amazon MQ はこのポリシーを管理して、ブローカー上のすべてのキューが 3 つのノードすべてにレプリケートされ、キューが自動的に同期化されることを確実にします。

  9. 新しいブローカーポリシーを作成するには、[Add / update a policy] (ポリシーの追加/更新) で以下を実行します。

    1. [Name] (名前) には、ポリシーの名前 (batch-size-policy など) を入力します。

    2. [Pattern] (パターン) には regexp パターン .* を入力して、ポリシーがブローカー上のすべてのキューと一致するようにします。

    3. [Apply to] (適用先) には、ドロップダウンリストから [Exchanges and queues] (エクスチェンジとキュー) を選択します。

    4. [Priority] (優先順位) には、vhost に適用されたその他すべてのポリシーよりも大きい整数を入力します。RabbitMQ のキューとエクスチェンジに適用できるのは、常に 1 つのポリシー定義セットのみです。RabbitMQ は、一致するポリシーで、最高の優先順位値を持つものを選択します。ポリシーの優先順位とポリシーの結合方法の詳細については、RabbitMQ サーバードキュメントの「Policies」を参照してください。

    5. [Definition] (定義) には、以下のキーバリューペアを追加します。

      • ha-sync-batch-size=100。ドロップダウンリストから [Number] (数値) を選択します。

        注記

        ha-sync-batch-size の値は、キュー内の同期されていないメッセージの数とサイズに基づいて調整と較正を行う必要がある場合があります。

      • ha-mode=all。ドロップダウンリストから [String] (文字列) を選択します。

        重要

        ha-mode 定義は、すべての HA 関連ポリシーに必須です。省略すると、検証が失敗します。

      • ha-sync-mode=automatic。ドロップダウンリストから [String] (文字列) を選択します。

        注記

        ha-sync-mode 定義は、すべてのカスタムポリシーに必須です。省略すると、Amazon MQ が定義を自動的に付加します。

    6. [Add / update policy] (ポリシーを追加/更新) をクリックします。

  10. [User policies] (ユーザーポリシー) リストに新しいポリシーが表示されることを確認します。

RabbitMQ Management API を使用して ha-sync-batch-size ポリシーを適用する
  1. Amazon MQ コンソールにサインインします。

  2. 左側のナビゲーションペインで [Brokers] (ブローカー) をクリックします。

  3. ブローカーのリストから、新しいポリシーを適用するブローカーの名前を選択します。

  4. ブローカーのページの [Connections] (接続) セクションで、RabbitMQ ウェブコンソール URL をメモします。これは、HTTP リクエストで使用するブローカーエンドポイントです。

  5. 任意の新しいターミナルまたはコマンドラインウィンドウを開きます。

  6. 新しいブローカーポリシーを作成するには、以下の curl コマンドを入力します。このコマンドでは、%2F としてエンコードされているデフォルト / vhost 上のキューを前提としています。

    注記

    ユーザー名パスワードを、ブローカー管理者のサインイン認証情報に置き換えます。ha-sync-batch-size の値 (100) は、キュー内の同期されていないメッセージの数とサイズに基づいて調整と較正を行う必要がある場合があります。ブローカーエンドポイントを先ほどメモした URL に置き換えます。

    curl -i -u username:password -H "content-type:application/json" -XPUT \ -d '{"pattern":".*", "priority":1, "definition":{"ha-sync-batch-size":100, "ha-mode":"all", "ha-sync-mode":"automatic"}}' \ https://b-589c045f-f8ln-4ab0-a89c-co62e1c32ef8.mq.us-west-2.amazonaws.com/api/policies/%2F/batch-size-policy
  7. 新しいポリシーがブローカーのユーザーポリシーに追加されていることを確認するには、以下の curl コマンドを入力して、すべてのブローカーポリシーをリストします。

    curl -i -u username:password https://b-589c045f-f8ln-4ab0-a89c-co62e1c32ef8.mq.us-west-2.amazonaws.com/api/policies

ステップ 2: キューの同期を再開する

ブローカーに新しい ha-sync-batch-size ポリシーを適用したら、キューの同期を再開します。

RabbitMQ ウェブコンソールを使用してキューの同期を再開する
注記

RabbitMQ ウェブコンソールを開くには、このチュートリアルのステップ 1 にある前述の手順を参照してください。

  1. RabbitMQ ウェブコンソールのページ上部で、[Queues] (キュー) をクリックします。

  2. [Queues] (キュー) ページの [All queues] (すべてのキュー) で、一時停止されたキューを見つけます。キューが、[Features] (特徴) 列に作成された新しいポリシーの名前をリストします (batch-size-policy など)。

  3. 縮小されたバッチサイズで同期プロセスを再開するには、[Restart sync] (同期を再開) をクリックします。

注記

同期が一時停止して正常に終了しない場合は、ha-sync-batch-size の値を低くして、もう一度キューの同期を再開してみてください。

次のステップ

  • キューが正常に同期化されたら、Amazon CloudWatch メトリクス RabbitMQMemUsed を表示することで、RabbitMQ ノードが使用するメモリの量をモニタリングできます。RabbitMQMemLimit メトリクスを表示して、ノードのメモリ制限をモニタリングすることもできます。詳細については、Amazon MQ 向けの CloudWatch メトリクスへのアクセスおよびAmazon MQ for RabbitMQ ブローカーで利用可能な CloudWatch メトリクスを参照してください。

  • キューの同期が一時停止しないようにするため、キューを短くしておき、メッセージを処理することをお勧めします。メッセージサイズが大きいワークロードの場合は、より多くのメモリを備えたより大きなインスタンスサイズにブローカーインスタンスタイプをアップグレードすることもお勧めします。ブローカーインスタンスタイプとブローカー設定の編集に関する詳細については、「Amazon MQ for RabbitMQ インスタンスタイプ」および「ブローカー設定の編集」を参照してください。

  • 新しい Amazon MQ for RabbitMQ ブローカーを作成するときは、ブローカーのパフォーマンスを最適化するために、Amazon MQ が一連のデフォルトブローカーポリシーと仮想ホスト制限を適用します。お使いのブローカーに推奨されるデフォルトのポリシーと制限がない場合は、独自のポリシーと制限を作成することをお勧めします。デフォルトのポリシーと vhost 制限の作成に関する詳細については、「Amazon MQ for RabbitMQ のブローカーデフォルト」を参照してください。

  • UpdateBrokerInput – Amazon MQ API を使用してブローカーインスタンスタイプを更新するには、このブローカープロパティを使用します。

  • Parameters and Policies (RabbitMQ サーバードキュメント) – RabbitMQ のウェブサイトで、RabbitMQ のパラメータとポリシーの詳細について学びます。

  • RabbitMQ Management HTTP API – RabbitMQ Management API の詳細について学びます。