翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Amazon MQ for RabbitMQ でPython Pika を使う
次のチュートリアルでは、Amazon MQ for RabbitMQ ブローカーに接続するように構成された TLS を使用して Python Pika
トピック
前提条件
このチュートリアルの最初のステップを完了するには、以下のものが必要です。
Amazon MQ for RabbitMQ ブローカー。詳細については、「Amazon MQ for RabbitMQ ブローカーを作成する」を参照してください。
オペレーティングシステム用に Python 3
がインストールされています。 -
Python
pip
を使用して、Pikaがインストールされました。Pika をインストールするには、新しいターミナルウィンドウを開き、以下を実行します。 $
python3 -m pip install pika
アクセス許可
このチュートリアルでは、vhost への書き込みおよび読み取りの許可を持つ Amazon MQ for RabbitMQ ブローカーユーザーが少なくとも 1 人必要です。以下の表は、正規表現 (regexp) パターンとして必要な最低限の許可を説明しています。
タグ | 設定 regexp | 書き込み regexp | 読み込み regexp |
---|---|---|---|
none |
|
.* |
.* |
リストされているユーザー許可は、ブローカで管理オペレーションを実行するための管理プラグインへのアクセスを付与することなく、ユーザーに読み取りおよび書き込み許可のみを提供します。特定のキューへのユーザーのアクセスを制限する正規表現パターンを提供することで、許可をさらに制限できます。例えば、読み取り regexp パターンを ^[hello world].*
に変更する場合、ユーザーには hello world
で始まるキューからの読み取り許可のみが付与されます。
RabbitMQ ユーザーの作成、およびユーザータグと許可の管理の詳細については、「Amazon MQ for RabbitMQ ブローカーのユーザー」を参照してください。
ステップ 1: 基本的な Python Pika クライアントを作成する
Amazon MQ for RabbitMQ ブローカーと対話するときに、コンストラクタを定義し、TLS 設定に必要な SSL コンテキストを提供する Python Pika クライアント基本クラスを作成するには、次の手順を実行します。
-
新しいターミナルウィンドウを開き、プロジェクトの新しいディレクトリを作成し、そのディレクトリに移動します。
$
mkdir pika-tutorial
$
cd pika-tutorial
-
以下の Python コードを含む
basicClient.py
というファイルを作成します。import ssl import pika class BasicPikaClient: def __init__(self, rabbitmq_broker_id, rabbitmq_user, rabbitmq_password, region): # SSL Context for TLS configuration of Amazon MQ for RabbitMQ ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) ssl_context.set_ciphers('ECDHE+AESGCM:!ECDSA') url = f"amqps://{rabbitmq_user}:{rabbitmq_password}@{rabbitmq_broker_id}.mq.{region}.amazonaws.com:5671" parameters = pika.URLParameters(url) parameters.ssl_options = pika.SSLOptions(context=ssl_context) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel()
パブリッシャーとコンシューマに対して、BasicPikaClient
から継承する追加のクラスを定義できるようになりました。
ステップ 2: パブリッシャーを作成してメッセージを送信する
キューを宣言し、1 つのメッセージを送信するパブリッシャを作成するには、次の手順を実行します。
-
次のコードサンプルの内容をコピーし、前のステップで作成した同じディレクトリで、
publisher.py
と名前を付けてローカルに保存します。from basicClient import BasicPikaClient class BasicMessageSender(BasicPikaClient): def declare_queue(self, queue_name): print(f"Trying to declare queue({queue_name})...") self.channel.queue_declare(queue=queue_name) def send_message(self, exchange, routing_key, body): channel = self.connection.channel() channel.basic_publish(exchange=exchange, routing_key=routing_key, body=body) print(f"Sent message. Exchange: {exchange}, Routing Key: {routing_key}, Body: {body}") def close(self): self.channel.close() self.connection.close() if __name__ == "__main__": # Initialize Basic Message Sender which creates a connection # and channel for sending messages. basic_message_sender = BasicMessageSender( "<broker-id>", "<username>", "<password>", "<region>" ) # Declare a queue basic_message_sender.declare_queue("hello world queue") # Send a message to the queue. basic_message_sender.send_message(exchange="", routing_key="hello world queue", body=b'Hello World!') # Close connections. basic_message_sender.close()
BasicMessageSender
クラスはBasicPikaClient
から継承され、キューの宣言、キューへのメッセージの送信、および接続を閉じるための追加のメソッドを実装します。コードサンプルでは、キューの名前と等しいルーティングキーを使用して、メッセージをデフォルトの交換にルーティングします。 -
[
if __name__ == "__main__":
] で、渡されたパラメータを次の情報を含むBasicMessageSender
コンストラクターステートメントで置換します。-
<broker-id>
– Amazon MQ がブローカー用に生成する一意の ID です。ID は、ブローカー ARN から解析できます。例えば、arn:aws:mq:us-east-2:123456789012:broker:MyBroker:b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9
という ARN の場合、ブローカー ID はb-1234a5b6-78cd-901e-2fgh-3i45j6k178l9
になります。 -
<username>
- ブローカにメッセージを書き込むのに十分な許可を持つブローカユーザーのユーザー名。 -
<password>
- ブローカにメッセージを書き込むのに十分な許可を持つブローカユーザーのパスワード。 -
<region>
- Amazon MQ for RabbitMQ ブローカーを作成した AWS リージョン。例えば、us-west-2
と指定します。
-
-
publisher.py
を作成した同じディレクトリで次のコマンドを実行します。$
python3 publisher.py
コードが正常に実行された場合、ターミナルウィンドウに次の出力が表示されます。
Trying to declare queue(hello world queue)... Sent message. Exchange: , Routing Key: hello world queue, Body: b'Hello World!'
ステップ 3: コンシューマを作成してメッセージを受信する
キューから 1 つのメッセージを受信するコンシューマを作成するには、次の手順を実行します。
-
次のコードサンプルの内容をコピーし、同じディレクトリで、
consumer.py
と名前を付けてローカルに保存します。from basicClient import BasicPikaClient class BasicMessageReceiver(BasicPikaClient): def get_message(self, queue): method_frame, header_frame, body = self.channel.basic_get(queue) if method_frame: print(method_frame, header_frame, body) self.channel.basic_ack(method_frame.delivery_tag) return method_frame, header_frame, body else: print('No message returned') def close(self): self.channel.close() self.connection.close() if __name__ == "__main__": # Create Basic Message Receiver which creates a connection # and channel for consuming messages. basic_message_receiver = BasicMessageReceiver( "<broker-id>", "<username>", "<password>", "<region>" ) # Consume the message that was sent. basic_message_receiver.get_message("hello world queue") # Close connections. basic_message_receiver.close()
前のステップで作成したパブリッシャーと同様に、
BasicMessageReciever
はBasicPikaClient
から継承し、単一のメッセージを受信し、接続を閉じるための追加のメソッドを実装します。 -
if __name__ == "__main__":
ステートメントで、渡されたパラメータを次の情報を含むBasicMessageReciever
コンストラクターに置換します。 -
プロジェクトディレクトリで次のコマンドを実行します。
$
python3 consumer.py
コードが正常に実行されると、メッセージ本文とルーティングキーを含むヘッダーがターミナルウィンドウに表示されます。
<Basic.GetOk(['delivery_tag=1', 'exchange=', 'message_count=0', 'redelivered=False', 'routing_key=hello world queue'])> <BasicProperties> b'Hello World!'
ステップ 4: (オプション) イベントループを設定し、メッセージを消費する
キューから複数のメッセージを消費するには、Pika の basic_consume
-
consumer.py
で、BasicMessageReceiver
クラスに以下のメソッド定義を追加します。def consume_messages(self, queue): def callback(ch, method, properties, body): print(" [x] Received %r" % body) self.channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') self.channel.start_consuming()
-
consumer.py
のif __name__ == "__main__":
の下で、前のステップで定義したconsume_messages
メソッドを呼び出します。if __name__ == "__main__": # Create Basic Message Receiver which creates a connection and channel for consuming messages. basic_message_receiver = BasicMessageReceiver( "<broker-id>", "<username>", "<password>", "<region>" ) # Consume the message that was sent. # basic_message_receiver.get_message("hello world queue") # Consume multiple messages in an event loop. basic_message_receiver.consume_messages("hello world queue") # Close connections. basic_message_receiver.close()
-
consumer.py
をもう一度実行し、成功すると、キューに入れられたメッセージがターミナルウィンドウに表示されます。[*] Waiting for messages. To exit press CTRL+C [x] Received b'Hello World!' [x] Received b'Hello World!' ...
次のステップ
-
サポートされている他の RabbitMQ クライアントライブラリの詳細については、RabbitMQ のウェブサイトの「RabbitMQ クライアントドキュメント
」を参照してください。