將 Java 應用程式連線到您的 Amazon MQ 代理程式 - Amazon MQ

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

將 Java 應用程式連線到您的 Amazon MQ 代理程式

建立 Amazon MQ ActiveMQ 代理程式後,您可以將應用程式連接到它。下列範例示範如何使用 Java 訊息服務 (JMS) 建立與代理程式的連線、建立佇列以及傳送訊息。如需完整的作用中 Java 範例,請參閱 Working Java Example

您可以使用多種 ActiveMQ 用戶端連線至 ActiveMQ 代理程式。建議使用 ActiveMQ 用戶端

必要條件

啟用 VPC 屬性

若要確保代理程式可以在 VPC 內存取,您必須啟用 enableDnsHostnamesenableDnsSupport VPC 屬性。如需詳細資訊,請參閱《Amazon VPC 使用者指南》中的 VPC 中的 DNS Support

啟用傳入連線

接著,為您的應用程式啟用傳入連線。

  1. 登入 Amazon MQ 主控台

  2. 從代理程式清單中,選擇您的代理程式名稱 (例如,MyBroker)。

  3. MyBroker 頁面的 Connections (連線) 區段中,記下代理程式 Web 主控台 URL 和線路通訊協定的位址和連接埠。

  4. Details (詳細資訊) 區段的 Security and network (安全與網路) 下,選擇您的安全群組名稱或 Pencil icon indicating an edit or modification action.

    隨即會顯示 EC2 儀表板的 Security Groups (安全群組) 頁面。

  5. 從安全群組清單選擇您的安全群組。

  6. 在頁面的最下方,選擇 Inbound (傳入),然後選擇 Edit (編輯)

  7. Edit inbound rules (編輯傳入規則) 對話方塊中,為您要公開存取的每個 URL 或端點新增規則 (下列範例顯示如何針對代理程式 Web 主控台執行此動作)。

    1. 選擇 Add Rule (新增規則)。

    2. 針對類型,選取自訂 TCP

    3. 針對 Port Range (連接埠範圍),輸入 Web 主控台連接埠 (8162)。

    4. 針對 Source (來源),讓 Custom (自訂) 保持已選取狀態,然後輸入您希望能夠存取 Web 主控台的系統 IP 地址 (例如,192.0.2.1)。

    5. 選擇 Save (儲存)

      您的代理程式現在已可接受傳入連線。

新增 Java 相依性

activemq-client.jaractivemq-pool.jar 套件新增到您的 Java 類別路徑。以下範例顯示 Maven 專案 pom.xml 檔案中的此等依存關係。

<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.16</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.16</version> </dependency> </dependencies>

如需 activemq-client.jar 的詳細資訊,請參閱 Apache ActiveMQ 文件中的初始組態

重要

在以下的範例程式碼中,生產者和消費者會在單一執行緒中執行。對於生產系統 (或測試代理程式執行個體容錯移轉),請確定您的生產者和消費者在不同的主機或執行緒上執行。

建立訊息生產者和傳送訊息

使用下列指示來建立訊息製作者並接收訊息。

  1. 使用代理程式的端點為訊息生產者建立 JMS 集區連接工廠,然後對工廠呼叫 createConnection 方法。

    注意

    對於作用中/待命代理程式,Amazon MQ 會提供兩個 ActiveMQ Web 主控台 URL,但一次只有一個作用中的 URL。同樣地,Amazon MQ 為每個線路通訊協定提供兩個端點,但每個配對中一次只有一個作用中的端點。-1-2 尾碼表示備援組合。如需詳細資訊,請參閱 適用於 ActiveMQ 代理程式的 Amazon MQ 部署選項

    對於線路通訊協定端點,您可以允許應用程式使用容錯移轉傳輸連線到任一端點。

    // Create a connection factory. final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(wireLevelEndpoint); // Pass the sign-in credentials. connectionFactory.setUserName(activeMqUsername); connectionFactory.setPassword(activeMqPassword); // Create a pooled connection factory. final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(); pooledConnectionFactory.setConnectionFactory(connectionFactory); pooledConnectionFactory.setMaxConnections(10); // Establish a connection for the producer. final Connection producerConnection = pooledConnectionFactory.createConnection(); producerConnection.start(); // Close all connections in the pool. pooledConnectionFactory.clear();
    注意

    訊息生產者應一律使用 PooledConnectionFactory 類別。如需詳細資訊,請參閱一律使用連線集區

  2. 建立工作階段、名為 MyQueue 的佇列和訊息生產者。

    // Create a session. final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue named "MyQueue". final Destination producerDestination = producerSession.createQueue("MyQueue"); // Create a producer from the session to the queue. final MessageProducer producer = producerSession.createProducer(producerDestination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  3. 建立訊息字串 "Hello from Amazon MQ!",然後傳送訊息。

    // Create a message. final String text = "Hello from Amazon MQ!"; TextMessage producerMessage = producerSession.createTextMessage(text); // Send the message. producer.send(producerMessage); System.out.println("Message sent.");
  4. 清除生產者。

    producer.close(); producerSession.close(); producerConnection.close();

建立訊息消費者和接收訊息

使用下列指示來建立訊息製作者並接收訊息。

  1. 使用代理程式的端點為訊息生產者建立 JMS 連線工廠,然後對工廠呼叫 createConnection 方法。

    // Create a connection factory. final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(wireLevelEndpoint); // Pass the sign-in credentials. connectionFactory.setUserName(activeMqUsername); connectionFactory.setPassword(activeMqPassword); // Establish a connection for the consumer. final Connection consumerConnection = connectionFactory.createConnection(); consumerConnection.start();
    注意

    訊息消費者不應使用 PooledConnectionFactory 類別。如需詳細資訊,請參閱一律使用連線集區

  2. 建立工作階段、名為 MyQueue 佇列和訊息消費者。

    // Create a session. final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue named "MyQueue". final Destination consumerDestination = consumerSession.createQueue("MyQueue"); // Create a message consumer from the session to the queue. final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
  3. 開始等待訊息,並在訊息送達時接收訊息。

    // Begin to wait for messages. final Message consumerMessage = consumer.receive(1000); // Receive the message when it arrives. final TextMessage consumerTextMessage = (TextMessage) consumerMessage; System.out.println("Message received: " + consumerTextMessage.getText());
    注意

    與 AWS 簡訊服務 (例如 Amazon SQS) 不同,消費者會持續連接到代理程式。

  4. 關閉消費者、工作階段和連線。

    consumer.close(); consumerSession.close(); consumerConnection.close();