本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
搭配 ActiveMQ 使用 Java Message Service (JMS) 的工作範例
以下範例顯示如何以程式設計方式使用 ActiveMQ:
-
Java 程式碼 OpenWire 範例會連線至代理程式、建立佇列,以及傳送和接收訊息。如需細節和詳細說明,請參閱 Connecting a Java application to your broker。
-
Java 程式碼MQTT範例會連線至代理程式、建立主題,以及發佈和接收訊息。
-
STOMP+WSS 範例 Java 程式碼會連線至代理程式、建立佇列,以及發佈和接收訊息。
必要條件
啟用VPC屬性
為了確保您的代理程式可在 內存取VPC,您必須啟用 enableDnsHostnames
和 enableDnsSupport
VPC 屬性。如需詳細資訊,請參閱 Amazon VPC使用者指南 DNS 中的 支援VPC。
啟用傳入連線
若要以程式設計方式使用 Amazon MQ,您必須使用傳入連線。
登入 Amazon MQ 主控台
。 從代理程式清單中,選擇代理程式的名稱 (例如 MyBroker)。
-
在 上
MyBroker
頁面中,在連線區段中,記下代理程式 Web 主控台和線層級通訊協定的地址URL和連接埠。 -
在 Details (詳細資訊) 區段的 Security and network (安全與網路) 下,選擇您的安全群組名稱或 。
EC2 儀表板的安全群組頁面隨即顯示。
-
從安全群組清單選擇您的安全群組。
-
在頁面的最下方,選擇 Inbound (傳入),然後選擇 Edit (編輯)。
-
在編輯傳入規則對話方塊中,為您要公開存取的每個 URL或端點新增規則 (下列範例顯示如何為代理程式 Web 主控台執行此操作)。
-
選擇 Add Rule (新增規則)。
-
對於類型 ,選取自訂 TCP。
-
針對 Port Range (連接埠範圍),輸入 Web 主控台連接埠 (
8162
)。 -
針對 Source (來源),讓 Custom (自訂) 保持已選取狀態,然後輸入您希望能夠存取 Web 主控台的系統 IP 地址 (例如,
192.0.2.1
)。 -
選擇 Save (儲存)。
您的代理程式現在已可接受傳入連線。
-
新增 Java 相依性
- OpenWire
-
將
activemq-client.jar
和activemq-pool.jar
套件新增到您的 Java 類別路徑。以下範例顯示 Maven 專案pom.xml
檔案中的此等依存關係。<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.16</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.16</version> </dependency> </dependencies>
如需
activemq-client.jar
的詳細資訊,請參閱 Apache ActiveMQ 文件中的初始組態。 - MQTT
-
將
org.eclipse.paho.client.mqttv3.jar
套件新增至您的 Java 類別路徑。以下範例顯示 Maven 專案pom.xml
檔案中的此一依存關係。<dependencies> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency> </dependencies>
如需
org.eclipse.paho.client.mqttv3.jar
的詳細資訊,請參閱 Eclipse Paho Java Client。 - STOMP+WSS
-
將以下套件新增到您的 Java 類別路徑:
-
spring-messaging.jar
-
spring-websocket.jar
-
javax.websocket-api.jar
-
jetty-all.jar
-
slf4j-simple.jar
-
jackson-databind.jar
以下範例顯示 Maven 專案
pom.xml
檔案中的此等依存關係。<dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> <version>5.0.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-websocket</artifactId> <version>5.0.5.RELEASE</version> </dependency> <dependency> <groupId>javax.websocket</groupId> <artifactId>javax.websocket-api</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>org.eclipse.jetty.aggregate</groupId> <artifactId>jetty-all</artifactId> <type>pom</type> <version>9.3.3.v20150827</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.6.6</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.5.0</version> </dependency> </dependencies>
如需詳細資訊,請參閱春季架構文件中的STOMP支援
。 -
mazonMQExample.java
重要
在以下的範例程式碼中,生產者和消費者會在單一執行緒中執行。對於生產系統 (或測試代理程式執行個體容錯移轉),請確定您的生產者和消費者在不同的主機或執行緒上執行。
- OpenWire
-
/* * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.jms.pool.PooledConnectionFactory; import javax.jms.*; public class AmazonMQExample { // Specify the connection parameters. private final static String WIRE_LEVEL_ENDPOINT = "
ssl://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:61617
"; private final static String ACTIVE_MQ_USERNAME = "MyUsername123
"; private final static String ACTIVE_MQ_PASSWORD = "MyPassword456
"; public static void main(String[] args) throws JMSException { final ActiveMQConnectionFactory connectionFactory = createActiveMQConnectionFactory(); final PooledConnectionFactory pooledConnectionFactory = createPooledConnectionFactory(connectionFactory); sendMessage(pooledConnectionFactory); receiveMessage(connectionFactory); pooledConnectionFactory.stop(); } private static void sendMessage(PooledConnectionFactory pooledConnectionFactory) throws JMSException { // Establish a connection for the producer. final Connection producerConnection = pooledConnectionFactory .createConnection(); producerConnection.start(); // Create a session. final Session producerSession = producerConnection .createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue named "MyQueue". final Destination producerDestination = producerSession .createQueue("MyQueue"); // Create a producer from the session to the queue. final MessageProducer producer = producerSession .createProducer(producerDestination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // Create a message. final String text = "Hello from Amazon MQ!"; final TextMessage producerMessage = producerSession .createTextMessage(text); // Send the message. producer.send(producerMessage); System.out.println("Message sent."); // Clean up the producer. producer.close(); producerSession.close(); producerConnection.close(); } private static void receiveMessage(ActiveMQConnectionFactory connectionFactory) throws JMSException { // Establish a connection for the consumer. // Note: Consumers should not use PooledConnectionFactory. final Connection consumerConnection = connectionFactory.createConnection(); consumerConnection.start(); // Create a session. final Session consumerSession = consumerConnection .createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue named "MyQueue". final Destination consumerDestination = consumerSession .createQueue("MyQueue"); // Create a message consumer from the session to the queue. final MessageConsumer consumer = consumerSession .createConsumer(consumerDestination); // Begin to wait for messages. final Message consumerMessage = consumer.receive(1000); // Receive the message when it arrives. final TextMessage consumerTextMessage = (TextMessage) consumerMessage; System.out.println("Message received: " + consumerTextMessage.getText()); // Clean up the consumer. consumer.close(); consumerSession.close(); consumerConnection.close(); } private static PooledConnectionFactory createPooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) { // Create a pooled connection factory. final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(); pooledConnectionFactory.setConnectionFactory(connectionFactory); pooledConnectionFactory.setMaxConnections(10); return pooledConnectionFactory; } private static ActiveMQConnectionFactory createActiveMQConnectionFactory() { // Create a connection factory. final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(WIRE_LEVEL_ENDPOINT); // Pass the sign-in credentials. connectionFactory.setUserName(ACTIVE_MQ_USERNAME); connectionFactory.setPassword(ACTIVE_MQ_PASSWORD); return connectionFactory; } } - MQTT
-
/* * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import org.eclipse.paho.client.mqttv3.*; public class AmazonMQExampleMqtt implements MqttCallback { // Specify the connection parameters. private final static String WIRE_LEVEL_ENDPOINT = "
ssl://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:8883
"; private final static String ACTIVE_MQ_USERNAME = "MyUsername123
"; private final static String ACTIVE_MQ_PASSWORD = "MyPassword456
"; public static void main(String[] args) throws Exception { new AmazonMQExampleMqtt().run(); } private void run() throws MqttException, InterruptedException { // Specify the topic name and the message text. final String topic = "myTopic"; final String text = "Hello from Amazon MQ!"; // Create the MQTT client and specify the connection options. final String clientId = "abc123"; final MqttClient client = new MqttClient(WIRE_LEVEL_ENDPOINT, clientId); final MqttConnectOptions connOpts = new MqttConnectOptions(); // Pass the sign-in credentials. connOpts.setUserName(ACTIVE_MQ_USERNAME); connOpts.setPassword(ACTIVE_MQ_PASSWORD.toCharArray()); // Create a session and subscribe to a topic filter. client.connect(connOpts); client.setCallback(this); client.subscribe("+"); // Create a message. final MqttMessage message = new MqttMessage(text.getBytes()); // Publish the message to a topic. client.publish(topic, message); System.out.println("Published message."); // Wait for the message to be received. Thread.sleep(3000L); // Clean up the connection. client.disconnect(); } @Override public void connectionLost(Throwable cause) { System.out.println("Lost connection."); } @Override public void messageArrived(String topic, MqttMessage message) throws MqttException { System.out.println("Received message from topic " + topic + ": " + message); } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("Delivered message."); } } - STOMP+WSS
-
/* * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import org.springframework.messaging.converter.StringMessageConverter; import org.springframework.messaging.simp.stomp.*; import org.springframework.web.socket.WebSocketHttpHeaders; import org.springframework.web.socket.client.WebSocketClient; import org.springframework.web.socket.client.standard.StandardWebSocketClient; import org.springframework.web.socket.messaging.WebSocketStompClient; import java.lang.reflect.Type; public class AmazonMQExampleStompWss { // Specify the connection parameters. private final static String DESTINATION = "/queue"; private final static String WIRE_LEVEL_ENDPOINT = "
wss://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:61619
"; private final static String ACTIVE_MQ_USERNAME = "MyUsername123
"; private final static String ACTIVE_MQ_PASSWORD = "MyPassword456
"; public static void main(String[] args) throws Exception { final AmazonMQExampleStompWss example = new AmazonMQExampleStompWss(); final StompSession stompSession = example.connect(); System.out.println("Subscribed to a destination using session."); example.subscribeToDestination(stompSession); System.out.println("Sent message to session."); example.sendMessage(stompSession); Thread.sleep(60000); } private StompSession connect() throws Exception { // Create a client. final WebSocketClient client = new StandardWebSocketClient(); final WebSocketStompClient stompClient = new WebSocketStompClient(client); stompClient.setMessageConverter(new StringMessageConverter()); final WebSocketHttpHeaders headers = new WebSocketHttpHeaders(); // Create headers with authentication parameters. final StompHeaders head = new StompHeaders(); head.add(StompHeaders.LOGIN, ACTIVE_MQ_USERNAME); head.add(StompHeaders.PASSCODE, ACTIVE_MQ_PASSWORD); final StompSessionHandler sessionHandler = new MySessionHandler(); // Create a connection. return stompClient.connect(WIRE_LEVEL_ENDPOINT, headers, head, sessionHandler).get(); } private void subscribeToDestination(final StompSession stompSession) { stompSession.subscribe(DESTINATION, new MyFrameHandler()); } private void sendMessage(final StompSession stompSession) { stompSession.send(DESTINATION, "Hello from Amazon MQ!".getBytes()); } private static class MySessionHandler extends StompSessionHandlerAdapter { public void afterConnected(final StompSession stompSession, final StompHeaders stompHeaders) { System.out.println("Connected to broker."); } } private static class MyFrameHandler implements StompFrameHandler { public Type getPayloadType(final StompHeaders headers) { return String.class; } public void handleFrame(final StompHeaders stompHeaders, final Object message) { System.out.print("Received message from topic: " + message); } } }