RabbitMQ용 Amazon MQ와 함께 Python Pika 사용 - Amazon MQ

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

RabbitMQ용 Amazon MQ와 함께 Python Pika 사용

다음 자습서에서는 RabbitMQ용 Amazon MQ 브로커에 연결하도록 구성된 TLS를 사용하여 Python Pika 클라이언트를 설정하는 방법을 보여줍니다. Pika는 RabbitMQ용 AMQP 0-9-1 프로토콜의 Python 구현입니다. 이 자습서에서는 Pika를 설치하고, 대기열을 선언하고, 브로커의 기본 교환에 메시지를 전송하도록 게시자를 설정하고, 대기열에서 메시지를 수신하도록 소비자를 설정하는 방법을 안내합니다.

사전 조건

이 자습서를 완료하려면 다음과 같은 사전 조건이 필요합니다.

  • RabbitMQ용 Amazon MQ 브로커 자세한 내용은 RabbitMQ용 Amazon MQ 브로커 생성을 참조하세요.

  • 운영 체제에 맞춰 설치된 Python 3

  • Python pip를 사용하여 설치된 Pika Pika를 설치하려면 새 터미널 창을 열고 다음을 실행합니다.

    $ python3 -m pip install pika

권한

이 자습서를 진행하려면 vhost에 대한 쓰기 및 읽기 권한이 있는 RabbitMQ용 Amazon MQ 브로커 사용자가 한 명 이상 필요합니다. 다음 표는 필요한 최소 권한을 정규식 패턴으로 보여줍니다.

태그 regexp 구성 regexp 쓰기 regexp 읽기
none .* .*

나열된 사용자 권한은 사용자에게 읽기 및 쓰기 권한만 제공합니다. 브로커에 대한 관리 작업을 수행할 수 있는 관리 플러그 인에 대한 액세스 권한은 부여되지 않습니다. 사용자의 액세스를 지정된 대기열로 제한하는 정규 표현식 패턴을 사용하여 권한을 추가적으로 제한할 수 있습니다. 예를 들어 읽기 정규 표현식 패턴을 ^[hello world].*로 변경하면 해당 사용자에게는 hello world로 시작하는 대기열에서 읽을 수 있는 권한만 부여됩니다.

RabbitMQ 사용자를 생성하고 사용자 태그 및 권한을 관리하는 방법에 대한 자세한 내용은 RabbitMQ 브로커 사용자를 위한 Amazon MQ RabbitMQ 단원을 참조하세요.

1단계: 기본 Python Pika 클라이언트 생성

RabbitMQ용 Amazon MQ 브로커와 상호 작용할 때 생성자를 정의하고 TLS 구성에 필요한 SSL 컨텍스트를 제공하는 Python Pika 클라이언트 기본 클래스를 생성하려면 다음을 수행합니다.

  1. 새 터미널 창을 열고 프로젝트에 대한 새 디렉터리를 생성한 다음, 해당 디렉터리로 이동합니다.

    $ mkdir pika-tutorial $ cd pika-tutorial
  2. 다음의 Python 코드가 포함된 basicClient.py라는 이름의 파일을 생성합니다.

    import ssl import pika class BasicPikaClient: def __init__(self, rabbitmq_broker_id, rabbitmq_user, rabbitmq_password, region): # SSL Context for TLS configuration of Amazon MQ for RabbitMQ ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) ssl_context.set_ciphers('ECDHE+AESGCM:!ECDSA') url = f"amqps://{rabbitmq_user}:{rabbitmq_password}@{rabbitmq_broker_id}.mq.{region}.amazonaws.com:5671" parameters = pika.URLParameters(url) parameters.ssl_options = pika.SSLOptions(context=ssl_context) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel()

이제 BasicPikaClient에서 상속된 게시자 및 소비자에 대한 추가 클래스를 정의할 수 있습니다.

2단계: 게시자 생성 및 메시지 전송

대기열을 선언하는 게시자를 생성하고 단일 메시지를 전송하려면 다음을 수행합니다.

  1. 다음 코드 샘플의 내용을 복사하고 로컬의 이전 단계에서 생성한 것과 동일한 디렉터리에 publisher.py로 저장합니다.

    from basicClient import BasicPikaClient class BasicMessageSender(BasicPikaClient): def declare_queue(self, queue_name): print(f"Trying to declare queue({queue_name})...") self.channel.queue_declare(queue=queue_name) def send_message(self, exchange, routing_key, body): channel = self.connection.channel() channel.basic_publish(exchange=exchange, routing_key=routing_key, body=body) print(f"Sent message. Exchange: {exchange}, Routing Key: {routing_key}, Body: {body}") def close(self): self.channel.close() self.connection.close() if __name__ == "__main__": # Initialize Basic Message Sender which creates a connection # and channel for sending messages. basic_message_sender = BasicMessageSender( "<broker-id>", "<username>", "<password>", "<region>" ) # Declare a queue basic_message_sender.declare_queue("hello world queue") # Send a message to the queue. basic_message_sender.send_message(exchange="", routing_key="hello world queue", body=b'Hello World!') # Close connections. basic_message_sender.close()

    BasicMessageSender 클래스는 BasicPikaClient에서 상속하여 대기열을 선언하고, 대기열에 메시지를 전송하고, 연결을 닫는 추가 메서드를 구현합니다. 코드 샘플은 대기열 이름과 동일한 라우팅 키를 사용하여 메시지를 기본 교환으로 라우팅합니다.

  2. if __name__ == "__main__": 아래에서 다음 정보가 포함된 BasicMessageSender 생성자 문으로 전달되는 파라미터를 교체합니다.

    • <broker-id> - Amazon MQ가 브로커에 대해 생성하는 고유한 ID입니다. 브로커 ARN에서 ID를 구문 분석할 수 있습니다. 예를 들어 ARN이 arn:aws:mq:us-east-2:123456789012:broker:MyBroker:b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9인 경우 브로커 ID는 b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9입니다.

    • <username> - 브로커에 메시지를 쓸 수 있는 충분한 권한이 있는 브로커 사용자의 사용자 이름입니다.

    • <password> - 브로커에 메시지를 쓸 수 있는 충분한 권한이 있는 브로커 사용자의 암호입니다.

    • <region> - RabbitMQ용 Amazon MQ 브로커를 생성한 AWS 리전입니다. 예: us-west-2.

  3. publisher.py를 생성한 것과 동일한 디렉터리에서 다음 명령을 실행합니다.

    $ python3 publisher.py

    코드가 성공적으로 실행되면 터미널 창에 다음과 같이 출력됩니다.

    Trying to declare queue(hello world queue)...
    Sent message. Exchange: , Routing Key: hello world queue, Body: b'Hello World!'

3단계: 소비자 생성 및 메시지 수신

대기열에서 단일 메시지를 수신하는 소비자를 생성하려면 다음을 수행합니다.

  1. 다음 코드 샘플의 내용을 복사하고 로컬의 동일한 디렉터리에 consumer.py로 저장합니다.

    from basicClient import BasicPikaClient class BasicMessageReceiver(BasicPikaClient): def get_message(self, queue): method_frame, header_frame, body = self.channel.basic_get(queue) if method_frame: print(method_frame, header_frame, body) self.channel.basic_ack(method_frame.delivery_tag) return method_frame, header_frame, body else: print('No message returned') def close(self): self.channel.close() self.connection.close() if __name__ == "__main__": # Create Basic Message Receiver which creates a connection # and channel for consuming messages. basic_message_receiver = BasicMessageReceiver( "<broker-id>", "<username>", "<password>", "<region>" ) # Consume the message that was sent. basic_message_receiver.get_message("hello world queue") # Close connections. basic_message_receiver.close()

    이전 단계에서 생성한 게시자와 유사하게, BasicMessageRecieverBasicPikaClient에서 상속하여 단일 메시지를 수신하고 연결을 닫는 추가 메서드를 구현합니다.

  2. if __name__ == "__main__": 문 아래에서 BasicMessageReciever 생성자로 전달되는 파라미터를 사용자의 정보로 교체합니다.

  3. 프로젝트 디렉터리에서 다음 명령을 실행합니다.

    $ python3 consumer.py

    코드가 성공적으로 실행되면 메시지 본문과 라우팅 키를 포함한 헤더가 터미널 창에 표시됩니다.

    <Basic.GetOk(['delivery_tag=1', 'exchange=', 'message_count=0', 'redelivered=False', 'routing_key=hello world queue'])> <BasicProperties> b'Hello World!'

4단계: (선택 사항) 이벤트 루프 설정 및 메시지 소비

대기열에서 여러 메시지를 소비하려면 다음과 같이 Pika의 basic_consume 메서드 및 콜백 함수를 사용합니다.

  1. consumer.py에서 BasicMessageReceiver 클래스에 다음 메서드 정의를 추가합니다.

    def consume_messages(self, queue): def callback(ch, method, properties, body): print(" [x] Received %r" % body) self.channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') self.channel.start_consuming()
  2. consumer.pyif __name__ == "__main__": 아래에서 이전 단계에서 정의한 consume_messages 메서드를 호출합니다.

    if __name__ == "__main__": # Create Basic Message Receiver which creates a connection and channel for consuming messages. basic_message_receiver = BasicMessageReceiver( "<broker-id>", "<username>", "<password>", "<region>" ) # Consume the message that was sent. # basic_message_receiver.get_message("hello world queue") # Consume multiple messages in an event loop. basic_message_receiver.consume_messages("hello world queue") # Close connections. basic_message_receiver.close()
  3. consumer.py를 다시 실행하고, 성공하면 대기 중인 메시지가 터미널 창에 표시됩니다.

    [*] Waiting for messages. To exit press CTRL+C
    [x] Received b'Hello World!'
    [x] Received b'Hello World!'
    ...

다음 단계

  • 지원되는 기타 RabbitMQ 클라이언트 라이브러리에 대한 자세한 내용은 RabbitMQ 웹 사이트에서 RabbitMQ 클라이언트 설명서를 참조하세요.