本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
將 Python Pika 與 Amazon MQ for RabbitMQ 搭配使用
以下教學課程說明如何設定 Python Pika
先決條件
若要完成本教學課程的步驟,您需要以下先決條件:
一個 Amazon MQ for RabbitMQ 代理程式。如需詳細資訊,請參閱建立 Amazon MQ for RabbitMQ 代理程式。
您的作業系統應安裝 Python 3
。 -
使用 Python
pip
安裝 Pika。若要安裝 Pika,請開啟新的終端機視窗並執行以下命令。 $
python3 -m pip install pika
許可
在本教學課程中,您至少需要一個 Amazon MQ for RabbitMQ 代理程式使用者,其具有寫入和讀取虛擬主機的許可。下表將必要的最低許可描述為規則表達式 (regexp) 模式。
Tags (標籤) | 設定 regexp | 寫入 regexp | 讀取 regexp |
---|---|---|---|
none |
|
.* |
.* |
列出的使用者許可僅為使用者提供讀取和寫入許可,而不會授予對管理外掛程式的存取權,以便在代理程式上執行管理操作。您可以提供限制使用者存取指定佇列的規則運算式模式,進一步限制許可。例如,如果您將讀取規則表達式模式變更為 ^[hello world].*
,則使用者將僅擁有從以 hello world
開頭之佇列中進行讀取的許可。
如需建立 RabbitMQ 使用者及管理使用者標籤和許可的詳細資訊,請參閱 Amazon MQ for RabbitMQ 代理程式使用者。
步驟一:建立基本 Python Pika 用戶端
若要建立 Python Pika 用戶端基本類別,以定義建構函數並在與 Amazon MQ for RabbitMQ 代理程式互動時提供 TLS 組態所需的 SSL 內容,請執行下列動作。
-
開啟新終端機視窗,為您的專案建立新目錄,並導覽至該目錄。
$
mkdir pika-tutorial
$
cd pika-tutorial
-
建立新檔案
basicClient.py
,其包含下列 Python 程式碼。import ssl import pika class BasicPikaClient: def __init__(self, rabbitmq_broker_id, rabbitmq_user, rabbitmq_password, region): # SSL Context for TLS configuration of Amazon MQ for RabbitMQ ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) ssl_context.set_ciphers('ECDHE+AESGCM:!ECDSA') url = f"amqps://{rabbitmq_user}:{rabbitmq_password}@{rabbitmq_broker_id}.mq.{region}.amazonaws.com:5671" parameters = pika.URLParameters(url) parameters.ssl_options = pika.SSLOptions(context=ssl_context) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel()
您現在可以為繼承自 BasicPikaClient
的發行者和取用者定義其他類別。
步驟二:建立發行者並傳送訊息
若要建立宣告佇列並傳送單一訊息的發行者,請執行下列動作。
-
複製下列程式碼範例的內容,並在上一個步驟建立的相同目錄中本機儲存為
publisher.py
。from basicClient import BasicPikaClient class BasicMessageSender(BasicPikaClient): def declare_queue(self, queue_name): print(f"Trying to declare queue({queue_name})...") self.channel.queue_declare(queue=queue_name) def send_message(self, exchange, routing_key, body): channel = self.connection.channel() channel.basic_publish(exchange=exchange, routing_key=routing_key, body=body) print(f"Sent message. Exchange: {exchange}, Routing Key: {routing_key}, Body: {body}") def close(self): self.channel.close() self.connection.close() if __name__ == "__main__": # Initialize Basic Message Sender which creates a connection # and channel for sending messages. basic_message_sender = BasicMessageSender( "<broker-id>", "<username>", "<password>", "<region>" ) # Declare a queue basic_message_sender.declare_queue("hello world queue") # Send a message to the queue. basic_message_sender.send_message(exchange="", routing_key="hello world queue", body=b'Hello World!') # Close connections. basic_message_sender.close()
BasicMessageSender
類別繼承自BasicPikaClient
並實作其他方法來宣告佇列、向佇列傳送訊息以及關閉連線。程式碼範例會將訊息路由傳送至預設交換,且路由金鑰等於佇列名稱。 -
在
if __name__ == "__main__":
下方,請用下列資訊取代傳遞給BasicMessageSender
建構函數陳述式的參數。-
<broker-id>
– Amazon MQ 針對代理程式產生的唯一 ID。您可以從代理程式 ARN 解析 ID。例如,假定是以下 ARNarn:aws:mq:us-east-2:123456789012:broker:MyBroker:b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9
,代理程式 ID 會是b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9
。 -
<username>
– 具有足夠許可可將訊息寫入代理程式的代理程式使用者的使用者名稱。 -
<password>
– 具有足夠許可可將訊息寫入代理程式的代理程式使用者的密碼。 -
<region>
– AWS 區域,您在其中建立了 Amazon MQ for RabbitMQ 代理程式。例如us-west-2
。
-
-
在您建立
publisher.py
的相同目錄中執行下列命令。$
python3 publisher.py
如果程式碼執行成功,則您將在終端機視窗中看到下列輸出。
Trying to declare queue(hello world queue)... Sent message. Exchange: , Routing Key: hello world queue, Body: b'Hello World!'
步驟三:建立取用者並接收訊息
若要建立從佇列接收單一訊息的取用者,請執行下列動作。
-
複製下列程式碼範例的內容,並在相同目錄中本機儲存為
consumer.py
。from basicClient import BasicPikaClient class BasicMessageReceiver(BasicPikaClient): def get_message(self, queue): method_frame, header_frame, body = self.channel.basic_get(queue) if method_frame: print(method_frame, header_frame, body) self.channel.basic_ack(method_frame.delivery_tag) return method_frame, header_frame, body else: print('No message returned') def close(self): self.channel.close() self.connection.close() if __name__ == "__main__": # Create Basic Message Receiver which creates a connection # and channel for consuming messages. basic_message_receiver = BasicMessageReceiver( "<broker-id>", "<username>", "<password>", "<region>" ) # Consume the message that was sent. basic_message_receiver.get_message("hello world queue") # Close connections. basic_message_receiver.close()
與您在上一個步驟建立的發行者類似,
BasicMessageReciever
繼承自BasicPikaClient
並實作其他方法來接收單一訊息並關閉連線。 -
在
if __name__ == "__main__":
下方,請用您的資訊取代傳遞給BasicMessageReciever
建構函數的參數。 -
在您的專案目錄中,執行下列命令。
$
python3 consumer.py
如果程式碼成功執行,則您會看到訊息內文以及包括路由金鑰在內的標頭,它們會顯示在終端機視窗中。
<Basic.GetOk(['delivery_tag=1', 'exchange=', 'message_count=0', 'redelivered=False', 'routing_key=hello world queue'])> <BasicProperties> b'Hello World!'
步驟四:(選用) 設定事件迴圈並取用訊息
若要取用佇列中的多條訊息,請使用 Pika 的 basic_consume
-
在
consumer.py
中,請將下列方法定義新增至BasicMessageReceiver
類別。def consume_messages(self, queue): def callback(ch, method, properties, body): print(" [x] Received %r" % body) self.channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') self.channel.start_consuming()
-
在
consumer.py
中,在if __name__ == "__main__":
下方,叫用您在上一步驟中定義的consume_messages
方法。if __name__ == "__main__": # Create Basic Message Receiver which creates a connection and channel for consuming messages. basic_message_receiver = BasicMessageReceiver( "<broker-id>", "<username>", "<password>", "<region>" ) # Consume the message that was sent. # basic_message_receiver.get_message("hello world queue") # Consume multiple messages in an event loop. basic_message_receiver.consume_messages("hello world queue") # Close connections. basic_message_receiver.close()
-
再次執行
consumer.py
,如果成功,佇列訊息會顯示在終端機視窗中。[*] Waiting for messages. To exit press CTRL+C [x] Received b'Hello World!' [x] Received b'Hello World!' ...
後續步驟?
-
如需其他支援 RabbitMQ 用戶端程式庫的詳細資訊,請參閱 RabbitMQ 網站上的 RabbitMQ 用戶端文件
。