기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
ActiveMQ를 통한 JMS(Java Message Service) 사용 예제
다음 예제에서는 ActiveMQ를 프로그래밍 방식으로 사용하는 방법을 보여 줍니다.
-
OpenWire 예시 Java 코드는 브로커에 연결하고, 대기열을 생성하고, 메시지를 보내고 받습니다. 자세한 설명과 분석은 Connecting a Java application to your broker 단원을 참조하세요.
-
MQTT 예제 Java 코드는 브로커에 연결하고, 주제를 생성하고, 메시지를 게시하고 받습니다.
-
STOMP+WSS 예제 Java 코드는 브로커에 연결하고, 대기열을 생성하고, 메시지를 게시하고 받습니다.
사전 조건
VPC 속성 활성화
VPC 내에서 브로커에 액세스할 수 있도록 하려면 enableDnsHostnames
및 enableDnsSupport
VPC 속성을 활성화해야 합니다. 자세한 내용은 Amazon VPC 사용 설명서의 VPC에서 DNS 지원을 참조하세요.
인바운드 연결 활성화
Amazon MQ를 프로그래밍 방식으로 사용하려면 인바운드 연결을 사용해야 합니다.
Amazon MQ 콘솔
에 로그인합니다. 브로커 목록에서 브로커 이름(예: MyBroker)을 선택합니다.
-
MyBroker
페이지의 Connections(연결) 섹션에서 브로커의 웹 콘솔 URL 및 와이어 레벨 프로토콜의 주소 및 포트를 적어둡니다. -
세부 정보 섹션의 보안 및 네트워크에서 보안 그룹의 이름 또는
을 선택합니다.
EC2 Dashboard의 보안 그룹 페이지가 표시됩니다.
-
보안 그룹 목록에서 보안 그룹을 선택합니다.
-
페이지 하단에서 인바운드를 선택한 후 편집을 선택합니다.
-
Edit inbound rules(인바운드 규칙 편집) 대화 상자에서 공개적으로 액세스하고자 하는 모든 URL 또는 엔드포인트에 대한 규칙을 추가합니다. 다음 예제에서는 브로커 웹 콘솔에 대해 이 작업을 수행하는 방법을 보여줍니다.
-
규칙 추가(Add Rule)를 선택합니다.
-
유형에서 Custom TCP(사용자 지정 TCP)를 선택합니다.
-
Port Range(포트 범위)에 웹 콘솔 포트(
8162
)를 입력합니다. -
Source(소스)에서 Custom(사용자 지정)을 선택한 상태에서 웹 콘솔에 액세스하는 데 사용할 시스템의 IP 주소(예:
192.0.2.1
)를 입력합니다. -
저장(Save)을 선택합니다.
이제 브로커가 인바운드 연결을 허용할 수 있습니다.
-
Java 종속성 추가
- OpenWire
-
activemq-client.jar
및activemq-pool.jar
패키지를 Java 클래스 경로에 추가합니다. 다음 예제는 Maven 프로젝트의pom.xml
파일 내에 존재하는 이러한 종속성을 보여줍니다.<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.16</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.16</version> </dependency> </dependencies>
activemq-client.jar
에 대한 자세한 정보는 Apache ActiveMQ 설명서의 Initial Configuration을 참조하세요. - MQTT
-
org.eclipse.paho.client.mqttv3.jar
패키지를 Java 클래스 경로에 추가합니다. 다음 예제는 Maven 프로젝트pom.xml
파일의 이 종속성을 보여줍니다.<dependencies> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency> </dependencies>
org.eclipse.paho.client.mqttv3.jar
에 대한 자세한 내용은 Eclipse Paho Java Client를 참조하세요. - STOMP+WSS
-
다음 패키지를 Java 클래스 경로에 추가합니다.
-
spring-messaging.jar
-
spring-websocket.jar
-
javax.websocket-api.jar
-
jetty-all.jar
-
slf4j-simple.jar
-
jackson-databind.jar
다음 예제는 Maven 프로젝트의
pom.xml
파일 내에 존재하는 이러한 종속성을 보여줍니다.<dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> <version>5.0.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-websocket</artifactId> <version>5.0.5.RELEASE</version> </dependency> <dependency> <groupId>javax.websocket</groupId> <artifactId>javax.websocket-api</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>org.eclipse.jetty.aggregate</groupId> <artifactId>jetty-all</artifactId> <type>pom</type> <version>9.3.3.v20150827</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.6.6</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.5.0</version> </dependency> </dependencies>
자세한 내용은 Spring Framework 설명서의 STOMP Support
를 참조하세요. -
AmazonMQExample.java
중요
다음 예제 코드에서 생성자와 소비자는 단일 스레드에서 실행됩니다. 프로덕션 시스템(또는 브로커 인스턴스 장애 조치 테스트)의 경우 생산자와 소비자가 별도의 호스트 또는 스레드에서 실행되는지 확인합니다.
- OpenWire
-
/* * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.jms.pool.PooledConnectionFactory; import javax.jms.*; public class AmazonMQExample { // Specify the connection parameters. private final static String WIRE_LEVEL_ENDPOINT = "
ssl://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:61617
"; private final static String ACTIVE_MQ_USERNAME = "MyUsername123
"; private final static String ACTIVE_MQ_PASSWORD = "MyPassword456
"; public static void main(String[] args) throws JMSException { final ActiveMQConnectionFactory connectionFactory = createActiveMQConnectionFactory(); final PooledConnectionFactory pooledConnectionFactory = createPooledConnectionFactory(connectionFactory); sendMessage(pooledConnectionFactory); receiveMessage(connectionFactory); pooledConnectionFactory.stop(); } private static void sendMessage(PooledConnectionFactory pooledConnectionFactory) throws JMSException { // Establish a connection for the producer. final Connection producerConnection = pooledConnectionFactory .createConnection(); producerConnection.start(); // Create a session. final Session producerSession = producerConnection .createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue named "MyQueue". final Destination producerDestination = producerSession .createQueue("MyQueue"); // Create a producer from the session to the queue. final MessageProducer producer = producerSession .createProducer(producerDestination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // Create a message. final String text = "Hello from Amazon MQ!"; final TextMessage producerMessage = producerSession .createTextMessage(text); // Send the message. producer.send(producerMessage); System.out.println("Message sent."); // Clean up the producer. producer.close(); producerSession.close(); producerConnection.close(); } private static void receiveMessage(ActiveMQConnectionFactory connectionFactory) throws JMSException { // Establish a connection for the consumer. // Note: Consumers should not use PooledConnectionFactory. final Connection consumerConnection = connectionFactory.createConnection(); consumerConnection.start(); // Create a session. final Session consumerSession = consumerConnection .createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue named "MyQueue". final Destination consumerDestination = consumerSession .createQueue("MyQueue"); // Create a message consumer from the session to the queue. final MessageConsumer consumer = consumerSession .createConsumer(consumerDestination); // Begin to wait for messages. final Message consumerMessage = consumer.receive(1000); // Receive the message when it arrives. final TextMessage consumerTextMessage = (TextMessage) consumerMessage; System.out.println("Message received: " + consumerTextMessage.getText()); // Clean up the consumer. consumer.close(); consumerSession.close(); consumerConnection.close(); } private static PooledConnectionFactory createPooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) { // Create a pooled connection factory. final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(); pooledConnectionFactory.setConnectionFactory(connectionFactory); pooledConnectionFactory.setMaxConnections(10); return pooledConnectionFactory; } private static ActiveMQConnectionFactory createActiveMQConnectionFactory() { // Create a connection factory. final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(WIRE_LEVEL_ENDPOINT); // Pass the sign-in credentials. connectionFactory.setUserName(ACTIVE_MQ_USERNAME); connectionFactory.setPassword(ACTIVE_MQ_PASSWORD); return connectionFactory; } } - MQTT
-
/* * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import org.eclipse.paho.client.mqttv3.*; public class AmazonMQExampleMqtt implements MqttCallback { // Specify the connection parameters. private final static String WIRE_LEVEL_ENDPOINT = "
ssl://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:8883
"; private final static String ACTIVE_MQ_USERNAME = "MyUsername123
"; private final static String ACTIVE_MQ_PASSWORD = "MyPassword456
"; public static void main(String[] args) throws Exception { new AmazonMQExampleMqtt().run(); } private void run() throws MqttException, InterruptedException { // Specify the topic name and the message text. final String topic = "myTopic"; final String text = "Hello from Amazon MQ!"; // Create the MQTT client and specify the connection options. final String clientId = "abc123"; final MqttClient client = new MqttClient(WIRE_LEVEL_ENDPOINT, clientId); final MqttConnectOptions connOpts = new MqttConnectOptions(); // Pass the sign-in credentials. connOpts.setUserName(ACTIVE_MQ_USERNAME); connOpts.setPassword(ACTIVE_MQ_PASSWORD.toCharArray()); // Create a session and subscribe to a topic filter. client.connect(connOpts); client.setCallback(this); client.subscribe("+"); // Create a message. final MqttMessage message = new MqttMessage(text.getBytes()); // Publish the message to a topic. client.publish(topic, message); System.out.println("Published message."); // Wait for the message to be received. Thread.sleep(3000L); // Clean up the connection. client.disconnect(); } @Override public void connectionLost(Throwable cause) { System.out.println("Lost connection."); } @Override public void messageArrived(String topic, MqttMessage message) throws MqttException { System.out.println("Received message from topic " + topic + ": " + message); } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("Delivered message."); } } - STOMP+WSS
-
/* * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import org.springframework.messaging.converter.StringMessageConverter; import org.springframework.messaging.simp.stomp.*; import org.springframework.web.socket.WebSocketHttpHeaders; import org.springframework.web.socket.client.WebSocketClient; import org.springframework.web.socket.client.standard.StandardWebSocketClient; import org.springframework.web.socket.messaging.WebSocketStompClient; import java.lang.reflect.Type; public class AmazonMQExampleStompWss { // Specify the connection parameters. private final static String DESTINATION = "/queue"; private final static String WIRE_LEVEL_ENDPOINT = "
wss://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:61619
"; private final static String ACTIVE_MQ_USERNAME = "MyUsername123
"; private final static String ACTIVE_MQ_PASSWORD = "MyPassword456
"; public static void main(String[] args) throws Exception { final AmazonMQExampleStompWss example = new AmazonMQExampleStompWss(); final StompSession stompSession = example.connect(); System.out.println("Subscribed to a destination using session."); example.subscribeToDestination(stompSession); System.out.println("Sent message to session."); example.sendMessage(stompSession); Thread.sleep(60000); } private StompSession connect() throws Exception { // Create a client. final WebSocketClient client = new StandardWebSocketClient(); final WebSocketStompClient stompClient = new WebSocketStompClient(client); stompClient.setMessageConverter(new StringMessageConverter()); final WebSocketHttpHeaders headers = new WebSocketHttpHeaders(); // Create headers with authentication parameters. final StompHeaders head = new StompHeaders(); head.add(StompHeaders.LOGIN, ACTIVE_MQ_USERNAME); head.add(StompHeaders.PASSCODE, ACTIVE_MQ_PASSWORD); final StompSessionHandler sessionHandler = new MySessionHandler(); // Create a connection. return stompClient.connect(WIRE_LEVEL_ENDPOINT, headers, head, sessionHandler).get(); } private void subscribeToDestination(final StompSession stompSession) { stompSession.subscribe(DESTINATION, new MyFrameHandler()); } private void sendMessage(final StompSession stompSession) { stompSession.send(DESTINATION, "Hello from Amazon MQ!".getBytes()); } private static class MySessionHandler extends StompSessionHandlerAdapter { public void afterConnected(final StompSession stompSession, final StompHeaders stompHeaders) { System.out.println("Connected to broker."); } } private static class MyFrameHandler implements StompFrameHandler { public Type getPayloadType(final StompHeaders headers) { return String.class; } public void handleFrame(final StompHeaders stompHeaders, final Object message) { System.out.print("Received message from topic: " + message); } } }