本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
将 Java 应用程序连接到您的 Amazon MQ 代理
创建 Amazon MQ ActiveMQ 代理后,您可以将应用程序连接到该代理。以下示例演示如何使用 Java Message Service(JMS)创建代理连接、创建队列以及发送消息。有关完整的可用 Java 示例,请参阅Working Java Example。
您可以使用各种 ActiveMQ 客户端
先决条件
启用 VPC 属性
要确保您的代理可以在您的 VPC 中访问,您必须启用 enableDnsHostnames
和 enableDnsSupport
VPC 属性。有关更多信息,请参阅《Amazon VPC 用户指南》中的 VPC 中的 DNS Support。
启用入站连接
接下来,为您的应用程序启用入站连接。
登录 Amazon MQ 控制台
。 从经纪人列表中,选择您的经纪商名称(例如 MyBroker)。
-
在该
MyBroker
页面的 “连接” 部分,记下代理的 Web 控制台 URL 和线级协议的地址和端口。 -
在 Details (详细信息) 部分的 Security and network (安全与网络) 下,选择您的安全组名称或
。
屏幕上将显示 EC2 控制面板的 “安全组” 页面。
-
从安全组列表中,选择您的安全组。
-
在页面底部,选择 Inbound (入站),然后选择 Edit (编辑)。
-
在 Edit inbound rules (编辑入站规则) 对话框中,为希望公开访问的每个 URL 或终端节点添加规则(以下示例显示如何为代理 Web 控制台执行此操作。
-
选择添加规则。
-
对于 Type (类型),选择 Custom TCP (自定义 TCP)。
-
对于 Port Range (端口范围),键入 Web 控制台端口(
8162
)。 -
对于 Source (源),选择 Custom (自定义),然后键入您希望能够访问 Web 控制台的系统的 IP 地址(例如
192.0.2.1
)。 -
选择保存。
您的代理现在可以接受入站连接。
-
添加 Java 依赖项
将 activemq-client.jar
和 activemq-pool.jar
程序包添加到 Java 类路径中。以下示例说明了 Maven 项目的 pom.xml
文件中的这些依赖关系。
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.16</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.16</version> </dependency> </dependencies>
有关 activemq-client.jar
的更多信息,请参阅 Apache ActiveMQ 文档中的初始配置
重要
在以下示例代码中,生产者和使用者在单个线程中运行。对于生产系统(或测试代理实例故障转移),请确保您的创建者和使用者在单独的主机或线程上运行。
创建消息创建者并发送消息
使用以下说明创建消息生产者并接收消息。
-
使用代理的终端节点为消息创建者创建 JMS 池连接工厂,然后针对该工厂调用
createConnection
方法。注意
对于主用/备用代理,Amazon MQ 提供了两个 ActiveMQ Web URLs 控制台,但一次只有一个 URL 处于活动状态。同样,Amazon MQ 为每个线级协议提供两个终端节点,但每次每对中只有一个终端节点处于活动状态。
-1
和-2
后缀表示冗余对。有关更多信息,请参阅 Amazon MQ for ActiveMQ 代理的部署选项。对于线级协议终端节点,您可以允许应用程序使用故障转移传输
连接到任一终端节点。 // Create a connection factory. final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(wireLevelEndpoint); // Pass the sign-in credentials. connectionFactory.setUserName(activeMqUsername); connectionFactory.setPassword(activeMqPassword); // Create a pooled connection factory. final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(); pooledConnectionFactory.setConnectionFactory(connectionFactory); pooledConnectionFactory.setMaxConnections(10); // Establish a connection for the producer. final Connection producerConnection = pooledConnectionFactory.createConnection(); producerConnection.start(); // Close all connections in the pool. pooledConnectionFactory.clear();
注意
消息创建者应始终使用
PooledConnectionFactory
类。有关更多信息,请参阅 始终使用连接池。 -
创建一个会话,一个名为
MyQueue
的队列和消息创建者。// Create a session. final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue named "MyQueue". final Destination producerDestination = producerSession.createQueue("MyQueue"); // Create a producer from the session to the queue. final MessageProducer producer = producerSession.createProducer(producerDestination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
创建消息字符串
"Hello from Amazon MQ!"
,然后发送消息。// Create a message. final String text = "Hello from Amazon MQ!"; TextMessage producerMessage = producerSession.createTextMessage(text); // Send the message. producer.send(producerMessage); System.out.println("Message sent.");
-
清理创建者。
producer.close(); producerSession.close(); producerConnection.close();
创建消息使用者并接收消息
使用以下说明创建消息生产者并接收消息。
-
使用代理的终端节点为消息创建者创建 JMS 连接工厂,然后针对该工厂调用
createConnection
方法。// Create a connection factory. final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(wireLevelEndpoint); // Pass the sign-in credentials. connectionFactory.setUserName(activeMqUsername); connectionFactory.setPassword(activeMqPassword); // Establish a connection for the consumer. final Connection consumerConnection = connectionFactory.createConnection(); consumerConnection.start();
注意
消息使用者绝不 应使用
PooledConnectionFactory
类。有关更多信息,请参阅 始终使用连接池。 -
创建一个会话,一个名为
MyQueue
的队列和消息使用者。// Create a session. final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue named "MyQueue". final Destination consumerDestination = consumerSession.createQueue("MyQueue"); // Create a message consumer from the session to the queue. final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
-
开始等待消息,并在消息到达时收到消息。
// Begin to wait for messages. final Message consumerMessage = consumer.receive(1000); // Receive the message when it arrives. final TextMessage consumerTextMessage = (TextMessage) consumerMessage; System.out.println("Message received: " + consumerTextMessage.getText());
注意
与 AWS 消息服务(例如 Amazon SQS)不同,消费者经常与经纪人建立联系。
-
关闭使用者、会话和连接。
consumer.close(); consumerSession.close(); consumerConnection.close();