Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Esempi funzionanti di utilizzo di Java Message Service (JMS) con ActiveMQ
Nei seguenti esempi viene illustrato come utilizzare ActiveMQ a livello di codice:
-
Il codice Java di OpenWire esempio si connette a un broker, crea una coda e invia e riceve un messaggio. Per un'analisi e una spiegazione dettagliata, consulta Connecting a Java application to your broker.
-
L'esempio di codice funzionante Java MQTT si connette a un broker, crea un argomento e invia e riceve un messaggio.
-
L'esempio di codice funzionante Java STOMP+WSS si connette a un broker, crea una coda e invia e riceve un messaggio.
Prerequisiti
Abilitazione attributi VPC
Per garantire che il broker sia accessibile all'interno del VPC, è necessario abilitare gli attributi VPC enableDnsHostnames
e enableDnsSupport
. Per ulteriori informazioni, consultare Supporto del DNS nel VPC nella Guida per l'utente di Amazon VPC.
Abilitazione delle connessioni in entrata
Per utilizzare Amazon MQ a livello di programmazione, devi utilizzare connessioni in entrata.
Accedere alla console Amazon MQ.
Dall'elenco dei broker, scegli il nome del tuo broker (ad esempio,). MyBroker
-
Nella MyBroker
pagina, nella sezione Connessioni, annota gli indirizzi e le porte dell'URL della console web del broker e dei protocolli a livello di cavo.
-
Nella sezione Details (Dettagli), in Security and network (Sicurezza e rete), scegliere il nome del gruppo di sicurezza o
.
Viene visualizzata la pagina Gruppi di sicurezza della EC2 Dashboard.
-
Scegli il tuo gruppo di sicurezza dall'elenco.
-
Nella parte inferiore della pagina scegli Inbound (In entrata), quindi scegli Edit (Modifica).
-
Nella finestra di dialogo Edit inbound rules (Modifica le regole in entrata), aggiungere una regola per ogni URL o endpoint che si desidera rendere accessibile pubblicamente (nell'esempio seguente viene illustrato come eseguire questa operazione per una console Web del broker).
-
Selezionare Add Rule (Aggiungi regola).
-
Per Type (Tipo) seleziona Custom TCP (TCP personalizzato).
-
Per Port Range (Intervallo porte), digitare la porta della console Web (8162
).
-
Per Source (Origine), lasciare selezionato Custom (Personalizzato), quindi inserire l'indirizzo IP del sistema a cui desideri poter accedere alla console Web (ad esempio, 192.0.2.1
).
-
Seleziona Salva.
Il broker può ora accettare connessioni in entrata.
Aggiunta di dipendenze Java
- OpenWire
-
Aggiungere i pacchetti activemq-client.jar
e activemq-pool.jar
al percorso di classe Java. L'esempio seguente mostra queste dipendenze in un file pom.xml
di progetto Maven.
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.15.16</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.16</version>
</dependency>
</dependencies>
Per ulteriori informazioni su activemq-client.jar
, consultare Configurazione iniziale nella documentazione di Apache ActiveMQ.
- MQTT
-
Aggiungi il pacchetto org.eclipse.paho.client.mqttv3.jar
al percorso di classe Java. L'esempio seguente mostra questa dipendenza in un file pom.xml
di progetto Maven.
<dependencies>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies>
Per ulteriori informazioni su org.eclipse.paho.client.mqttv3.jar
, consulta Eclipse Paho Java Client.
- STOMP+WSS
-
Aggiungi i pacchetti seguenti al percorso di classe Java:
-
spring-messaging.jar
-
spring-websocket.jar
-
javax.websocket-api.jar
-
jetty-all.jar
-
slf4j-simple.jar
-
jackson-databind.jar
L'esempio seguente mostra queste dipendenze in un file pom.xml
di progetto Maven.
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>5.0.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
<version>5.0.5.RELEASE</version>
</dependency>
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>jetty-all</artifactId>
<type>pom</type>
<version>9.3.3.v20150827</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.5.0</version>
</dependency>
</dependencies>
Per ulteriori informazioni, consulta STOMP Support nella documentazione di Spring Framework.
Amazon MQExample .java
Nel codice di esempio seguente, produttori e consumatori vengono eseguiti in un singolo thread. Per i sistemi di produzione (o per testare il failover delle istanze del broker), assicurarsi che i produttori e i consumatori vengano eseguiti su host o thread separati.
- OpenWire
-
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* https://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import javax.jms.*;
public class AmazonMQExample {
// Specify the connection parameters.
private final static String WIRE_LEVEL_ENDPOINT
= "ssl://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:61617
";
private final static String ACTIVE_MQ_USERNAME = "MyUsername123
";
private final static String ACTIVE_MQ_PASSWORD = "MyPassword456
";
public static void main(String[] args) throws JMSException {
final ActiveMQConnectionFactory connectionFactory =
createActiveMQConnectionFactory();
final PooledConnectionFactory pooledConnectionFactory =
createPooledConnectionFactory(connectionFactory);
sendMessage(pooledConnectionFactory);
receiveMessage(connectionFactory);
pooledConnectionFactory.stop();
}
private static void
sendMessage(PooledConnectionFactory pooledConnectionFactory) throws JMSException {
// Establish a connection for the producer.
final Connection producerConnection = pooledConnectionFactory
.createConnection();
producerConnection.start();
// Create a session.
final Session producerSession = producerConnection
.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create a queue named "MyQueue".
final Destination producerDestination = producerSession
.createQueue("MyQueue");
// Create a producer from the session to the queue.
final MessageProducer producer = producerSession
.createProducer(producerDestination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Create a message.
final String text = "Hello from Amazon MQ!";
final TextMessage producerMessage = producerSession
.createTextMessage(text);
// Send the message.
producer.send(producerMessage);
System.out.println("Message sent.");
// Clean up the producer.
producer.close();
producerSession.close();
producerConnection.close();
}
private static void
receiveMessage(ActiveMQConnectionFactory connectionFactory) throws JMSException {
// Establish a connection for the consumer.
// Note: Consumers should not use PooledConnectionFactory.
final Connection consumerConnection = connectionFactory.createConnection();
consumerConnection.start();
// Create a session.
final Session consumerSession = consumerConnection
.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create a queue named "MyQueue".
final Destination consumerDestination = consumerSession
.createQueue("MyQueue");
// Create a message consumer from the session to the queue.
final MessageConsumer consumer = consumerSession
.createConsumer(consumerDestination);
// Begin to wait for messages.
final Message consumerMessage = consumer.receive(1000);
// Receive the message when it arrives.
final TextMessage consumerTextMessage = (TextMessage) consumerMessage;
System.out.println("Message received: " + consumerTextMessage.getText());
// Clean up the consumer.
consumer.close();
consumerSession.close();
consumerConnection.close();
}
private static PooledConnectionFactory
createPooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
// Create a pooled connection factory.
final PooledConnectionFactory pooledConnectionFactory =
new PooledConnectionFactory();
pooledConnectionFactory.setConnectionFactory(connectionFactory);
pooledConnectionFactory.setMaxConnections(10);
return pooledConnectionFactory;
}
private static ActiveMQConnectionFactory createActiveMQConnectionFactory() {
// Create a connection factory.
final ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(WIRE_LEVEL_ENDPOINT);
// Pass the sign-in credentials.
connectionFactory.setUserName(ACTIVE_MQ_USERNAME);
connectionFactory.setPassword(ACTIVE_MQ_PASSWORD);
return connectionFactory;
}
}
- MQTT
-
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* https://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/
import org.eclipse.paho.client.mqttv3.*;
public class AmazonMQExampleMqtt implements MqttCallback {
// Specify the connection parameters.
private final static String WIRE_LEVEL_ENDPOINT =
"ssl://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:8883
";
private final static String ACTIVE_MQ_USERNAME = "MyUsername123
";
private final static String ACTIVE_MQ_PASSWORD = "MyPassword456
";
public static void main(String[] args) throws Exception {
new AmazonMQExampleMqtt().run();
}
private void run() throws MqttException, InterruptedException {
// Specify the topic name and the message text.
final String topic = "myTopic";
final String text = "Hello from Amazon MQ!";
// Create the MQTT client and specify the connection options.
final String clientId = "abc123";
final MqttClient client = new MqttClient(WIRE_LEVEL_ENDPOINT, clientId);
final MqttConnectOptions connOpts = new MqttConnectOptions();
// Pass the sign-in credentials.
connOpts.setUserName(ACTIVE_MQ_USERNAME);
connOpts.setPassword(ACTIVE_MQ_PASSWORD.toCharArray());
// Create a session and subscribe to a topic filter.
client.connect(connOpts);
client.setCallback(this);
client.subscribe("+");
// Create a message.
final MqttMessage message = new MqttMessage(text.getBytes());
// Publish the message to a topic.
client.publish(topic, message);
System.out.println("Published message.");
// Wait for the message to be received.
Thread.sleep(3000L);
// Clean up the connection.
client.disconnect();
}
@Override
public void connectionLost(Throwable cause) {
System.out.println("Lost connection.");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws MqttException {
System.out.println("Received message from topic " + topic + ": " + message);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Delivered message.");
}
}
- STOMP+WSS
-
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* https://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.messaging.simp.stomp.*;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import java.lang.reflect.Type;
public class AmazonMQExampleStompWss {
// Specify the connection parameters.
private final static String DESTINATION = "/queue";
private final static String WIRE_LEVEL_ENDPOINT =
"wss://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:61619
";
private final static String ACTIVE_MQ_USERNAME = "MyUsername123
";
private final static String ACTIVE_MQ_PASSWORD = "MyPassword456
";
public static void main(String[] args) throws Exception {
final AmazonMQExampleStompWss example = new AmazonMQExampleStompWss();
final StompSession stompSession = example.connect();
System.out.println("Subscribed to a destination using session.");
example.subscribeToDestination(stompSession);
System.out.println("Sent message to session.");
example.sendMessage(stompSession);
Thread.sleep(60000);
}
private StompSession connect() throws Exception {
// Create a client.
final WebSocketClient client = new StandardWebSocketClient();
final WebSocketStompClient stompClient = new WebSocketStompClient(client);
stompClient.setMessageConverter(new StringMessageConverter());
final WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
// Create headers with authentication parameters.
final StompHeaders head = new StompHeaders();
head.add(StompHeaders.LOGIN, ACTIVE_MQ_USERNAME);
head.add(StompHeaders.PASSCODE, ACTIVE_MQ_PASSWORD);
final StompSessionHandler sessionHandler = new MySessionHandler();
// Create a connection.
return stompClient.connect(WIRE_LEVEL_ENDPOINT, headers, head,
sessionHandler).get();
}
private void subscribeToDestination(final StompSession stompSession) {
stompSession.subscribe(DESTINATION, new MyFrameHandler());
}
private void sendMessage(final StompSession stompSession) {
stompSession.send(DESTINATION, "Hello from Amazon MQ!".getBytes());
}
private static class MySessionHandler extends StompSessionHandlerAdapter {
public void afterConnected(final StompSession stompSession,
final StompHeaders stompHeaders) {
System.out.println("Connected to broker.");
}
}
private static class MyFrameHandler implements StompFrameHandler {
public Type getPayloadType(final StompHeaders headers) {
return String.class;
}
public void handleFrame(final StompHeaders stompHeaders,
final Object message) {
System.out.print("Received message from topic: " + message);
}
}
}