本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
將 Java 應用程式連線到您的 Amazon MQ 代理程式
建立 Amazon MQ ActiveMQ 代理程式後,您可以將應用程式連接到它。下列範例示範如何使用 Java 訊息服務 (JMS) 建立與代理程式的連線、建立佇列以及傳送訊息。如需完整的作用中 Java 範例,請參閱 Working Java Example。
您可以使用多種 ActiveMQ 用戶端
必要條件
啟用 VPC 屬性
若要確保代理程式可以在 VPC 內存取,您必須啟用 enableDnsHostnames
和 enableDnsSupport
VPC 屬性。如需詳細資訊,請參閱《Amazon VPC 使用者指南》中的 VPC 中的 DNS Support。
啟用傳入連線
接著,為您的應用程式啟用傳入連線。
登入 Amazon MQ 主控台
。 從代理程式清單中,選擇您的代理程式名稱 (例如,MyBroker)。
-
在
MyBroker
頁面的 Connections (連線) 區段中,記下代理程式 Web 主控台 URL 和線路通訊協定的位址和連接埠。 -
在 Details (詳細資訊) 區段的 Security and network (安全與網路) 下,選擇您的安全群組名稱或
。
隨即會顯示 EC2 儀表板的 Security Groups (安全群組) 頁面。
-
從安全群組清單選擇您的安全群組。
-
在頁面的最下方,選擇 Inbound (傳入),然後選擇 Edit (編輯)。
-
在 Edit inbound rules (編輯傳入規則) 對話方塊中,為您要公開存取的每個 URL 或端點新增規則 (下列範例顯示如何針對代理程式 Web 主控台執行此動作)。
-
選擇 Add Rule (新增規則)。
-
針對類型,選取自訂 TCP。
-
針對 Port Range (連接埠範圍),輸入 Web 主控台連接埠 (
8162
)。 -
針對 Source (來源),讓 Custom (自訂) 保持已選取狀態,然後輸入您希望能夠存取 Web 主控台的系統 IP 地址 (例如,
192.0.2.1
)。 -
選擇 Save (儲存)。
您的代理程式現在已可接受傳入連線。
-
新增 Java 相依性
將 activemq-client.jar
和 activemq-pool.jar
套件新增到您的 Java 類別路徑。以下範例顯示 Maven 專案 pom.xml
檔案中的此等依存關係。
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.16</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.16</version> </dependency> </dependencies>
如需 activemq-client.jar
的詳細資訊,請參閱 Apache ActiveMQ 文件中的初始組態
重要
在以下的範例程式碼中,生產者和消費者會在單一執行緒中執行。對於生產系統 (或測試代理程式執行個體容錯移轉),請確定您的生產者和消費者在不同的主機或執行緒上執行。
建立訊息生產者和傳送訊息
使用下列指示來建立訊息製作者並接收訊息。
-
使用代理程式的端點為訊息生產者建立 JMS 集區連接工廠,然後對工廠呼叫
createConnection
方法。注意
對於作用中/待命代理程式,Amazon MQ 會提供兩個 ActiveMQ Web 主控台 URL,但一次只有一個作用中的 URL。同樣地,Amazon MQ 為每個線路通訊協定提供兩個端點,但每個配對中一次只有一個作用中的端點。
-1
和-2
尾碼表示備援組合。如需詳細資訊,請參閱 適用於 ActiveMQ 代理程式的 Amazon MQ 部署選項。對於線路通訊協定端點,您可以允許應用程式使用容錯移轉傳輸
連線到任一端點。 // Create a connection factory. final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(wireLevelEndpoint); // Pass the sign-in credentials. connectionFactory.setUserName(activeMqUsername); connectionFactory.setPassword(activeMqPassword); // Create a pooled connection factory. final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(); pooledConnectionFactory.setConnectionFactory(connectionFactory); pooledConnectionFactory.setMaxConnections(10); // Establish a connection for the producer. final Connection producerConnection = pooledConnectionFactory.createConnection(); producerConnection.start(); // Close all connections in the pool. pooledConnectionFactory.clear();
注意
訊息生產者應一律使用
PooledConnectionFactory
類別。如需詳細資訊,請參閱一律使用連線集區。 -
建立工作階段、名為
MyQueue
的佇列和訊息生產者。// Create a session. final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue named "MyQueue". final Destination producerDestination = producerSession.createQueue("MyQueue"); // Create a producer from the session to the queue. final MessageProducer producer = producerSession.createProducer(producerDestination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
建立訊息字串
"Hello from Amazon MQ!"
,然後傳送訊息。// Create a message. final String text = "Hello from Amazon MQ!"; TextMessage producerMessage = producerSession.createTextMessage(text); // Send the message. producer.send(producerMessage); System.out.println("Message sent.");
-
清除生產者。
producer.close(); producerSession.close(); producerConnection.close();
建立訊息消費者和接收訊息
使用下列指示來建立訊息製作者並接收訊息。
-
使用代理程式的端點為訊息生產者建立 JMS 連線工廠,然後對工廠呼叫
createConnection
方法。// Create a connection factory. final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(wireLevelEndpoint); // Pass the sign-in credentials. connectionFactory.setUserName(activeMqUsername); connectionFactory.setPassword(activeMqPassword); // Establish a connection for the consumer. final Connection consumerConnection = connectionFactory.createConnection(); consumerConnection.start();
注意
訊息消費者不應使用
PooledConnectionFactory
類別。如需詳細資訊,請參閱一律使用連線集區。 -
建立工作階段、名為
MyQueue
佇列和訊息消費者。// Create a session. final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue named "MyQueue". final Destination consumerDestination = consumerSession.createQueue("MyQueue"); // Create a message consumer from the session to the queue. final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
-
開始等待訊息,並在訊息送達時接收訊息。
// Begin to wait for messages. final Message consumerMessage = consumer.receive(1000); // Receive the message when it arrives. final TextMessage consumerTextMessage = (TextMessage) consumerMessage; System.out.println("Message received: " + consumerTextMessage.getText());
注意
與 AWS 簡訊服務 (例如 Amazon SQS) 不同,消費者會持續連接到代理程式。
-
關閉消費者、工作階段和連線。
consumer.close(); consumerSession.close(); consumerConnection.close();