Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Esempi di utilizzo di Java Message Service (JMS) con ActiveMQ
Nei seguenti esempi viene illustrato come utilizzare ActiveMQ a livello di codice:
-
Il codice Java di OpenWire esempio si connette a un broker, crea una coda e invia e riceve un messaggio. Per un'analisi e una spiegazione dettagliata, consulta Connecting a Java application to your broker.
-
Il codice Java di MQTT esempio si connette a un broker, crea un argomento, pubblica e riceve un messaggio.
-
Il codice Java di WSS esempio STOMP + si connette a un broker, crea una coda e pubblica e riceve un messaggio.
Prerequisiti
Abilita gli attributi VPC
Per assicurarti che il tuo broker sia accessibile all'interno del tuoVPC, devi abilitare gli enableDnsSupport
VPC attributi enableDnsHostnames
and. Per ulteriori informazioni, consulta la sezione DNSSupport VPC nella tua Amazon VPC User Guide.
Abilitazione delle connessioni in entrata
Per utilizzare Amazon MQ a livello di programmazione, devi utilizzare connessioni in entrata.
Accedere alla console Amazon MQ.
Dall'elenco dei broker, scegli il nome del tuo broker (ad esempio,). MyBroker
-
Sul MyBroker
pagina, nella sezione Connessioni, annota gli indirizzi e le porte della console web del broker URL e dei protocolli a livello di cavo.
-
Nella sezione Details (Dettagli), in Security and network (Sicurezza e rete), scegliere il nome del gruppo di sicurezza o
.
Viene visualizzata la pagina Gruppi di sicurezza della EC2 Dashboard.
-
Scegli il tuo gruppo di sicurezza dall'elenco.
-
Nella parte inferiore della pagina scegli Inbound (In entrata), quindi scegli Edit (Modifica).
-
Nella finestra di dialogo Modifica regole in entrata, aggiungi una regola per ogni URL dispositivo che desideri rendere accessibile pubblicamente (l'esempio seguente mostra come eseguire questa operazione per una console web di broker).
-
Selezionare Add Rule (Aggiungi regola).
-
Per Tipo, seleziona Personalizzato. TCP
-
Per Port Range (Intervallo porte), digitare la porta della console Web (8162
).
-
Per Source (Origine), lasciare selezionato Custom (Personalizzato), quindi inserire l'indirizzo IP del sistema a cui desideri poter accedere alla console Web (ad esempio, 192.0.2.1
).
-
Seleziona Salva.
Il broker può ora accettare connessioni in entrata.
Aggiunta di dipendenze Java
- OpenWire
-
Aggiungere i pacchetti activemq-client.jar
e activemq-pool.jar
al percorso di classe Java. L'esempio seguente mostra queste dipendenze in un file pom.xml
di progetto Maven.
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.15.16</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.16</version>
</dependency>
</dependencies>
Per ulteriori informazioni su activemq-client.jar
, consultare Configurazione iniziale nella documentazione di Apache ActiveMQ.
- MQTT
-
Aggiungi il pacchetto org.eclipse.paho.client.mqttv3.jar
al percorso di classe Java. L'esempio seguente mostra questa dipendenza in un file pom.xml
di progetto Maven.
<dependencies>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies>
Per ulteriori informazioni su org.eclipse.paho.client.mqttv3.jar
, consulta Eclipse Paho Java Client.
- STOMP+WSS
-
Aggiungi i pacchetti seguenti al percorso di classe Java:
-
spring-messaging.jar
-
spring-websocket.jar
-
javax.websocket-api.jar
-
jetty-all.jar
-
slf4j-simple.jar
-
jackson-databind.jar
L'esempio seguente mostra queste dipendenze in un file pom.xml
di progetto Maven.
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>5.0.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
<version>5.0.5.RELEASE</version>
</dependency>
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>jetty-all</artifactId>
<type>pom</type>
<version>9.3.3.v20150827</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.5.0</version>
</dependency>
</dependencies>
Per ulteriori informazioni, vedere STOMPSupport nella documentazione di Spring Framework.
Un mazonMQExample file.java
Nel codice di esempio seguente, produttori e consumatori vengono eseguiti in un singolo thread. Per i sistemi di produzione (o per testare il failover delle istanze del broker), assicurarsi che i produttori e i consumatori vengano eseguiti su host o thread separati.
- OpenWire
-
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* https://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import javax.jms.*;
public class AmazonMQExample {
// Specify the connection parameters.
private final static String WIRE_LEVEL_ENDPOINT
= "ssl://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:61617
";
private final static String ACTIVE_MQ_USERNAME = "MyUsername123
";
private final static String ACTIVE_MQ_PASSWORD = "MyPassword456
";
public static void main(String[] args) throws JMSException {
final ActiveMQConnectionFactory connectionFactory =
createActiveMQConnectionFactory();
final PooledConnectionFactory pooledConnectionFactory =
createPooledConnectionFactory(connectionFactory);
sendMessage(pooledConnectionFactory);
receiveMessage(connectionFactory);
pooledConnectionFactory.stop();
}
private static void
sendMessage(PooledConnectionFactory pooledConnectionFactory) throws JMSException {
// Establish a connection for the producer.
final Connection producerConnection = pooledConnectionFactory
.createConnection();
producerConnection.start();
// Create a session.
final Session producerSession = producerConnection
.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create a queue named "MyQueue".
final Destination producerDestination = producerSession
.createQueue("MyQueue");
// Create a producer from the session to the queue.
final MessageProducer producer = producerSession
.createProducer(producerDestination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Create a message.
final String text = "Hello from Amazon MQ!";
final TextMessage producerMessage = producerSession
.createTextMessage(text);
// Send the message.
producer.send(producerMessage);
System.out.println("Message sent.");
// Clean up the producer.
producer.close();
producerSession.close();
producerConnection.close();
}
private static void
receiveMessage(ActiveMQConnectionFactory connectionFactory) throws JMSException {
// Establish a connection for the consumer.
// Note: Consumers should not use PooledConnectionFactory.
final Connection consumerConnection = connectionFactory.createConnection();
consumerConnection.start();
// Create a session.
final Session consumerSession = consumerConnection
.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create a queue named "MyQueue".
final Destination consumerDestination = consumerSession
.createQueue("MyQueue");
// Create a message consumer from the session to the queue.
final MessageConsumer consumer = consumerSession
.createConsumer(consumerDestination);
// Begin to wait for messages.
final Message consumerMessage = consumer.receive(1000);
// Receive the message when it arrives.
final TextMessage consumerTextMessage = (TextMessage) consumerMessage;
System.out.println("Message received: " + consumerTextMessage.getText());
// Clean up the consumer.
consumer.close();
consumerSession.close();
consumerConnection.close();
}
private static PooledConnectionFactory
createPooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
// Create a pooled connection factory.
final PooledConnectionFactory pooledConnectionFactory =
new PooledConnectionFactory();
pooledConnectionFactory.setConnectionFactory(connectionFactory);
pooledConnectionFactory.setMaxConnections(10);
return pooledConnectionFactory;
}
private static ActiveMQConnectionFactory createActiveMQConnectionFactory() {
// Create a connection factory.
final ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(WIRE_LEVEL_ENDPOINT);
// Pass the sign-in credentials.
connectionFactory.setUserName(ACTIVE_MQ_USERNAME);
connectionFactory.setPassword(ACTIVE_MQ_PASSWORD);
return connectionFactory;
}
}
- MQTT
-
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* https://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/
import org.eclipse.paho.client.mqttv3.*;
public class AmazonMQExampleMqtt implements MqttCallback {
// Specify the connection parameters.
private final static String WIRE_LEVEL_ENDPOINT =
"ssl://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:8883
";
private final static String ACTIVE_MQ_USERNAME = "MyUsername123
";
private final static String ACTIVE_MQ_PASSWORD = "MyPassword456
";
public static void main(String[] args) throws Exception {
new AmazonMQExampleMqtt().run();
}
private void run() throws MqttException, InterruptedException {
// Specify the topic name and the message text.
final String topic = "myTopic";
final String text = "Hello from Amazon MQ!";
// Create the MQTT client and specify the connection options.
final String clientId = "abc123";
final MqttClient client = new MqttClient(WIRE_LEVEL_ENDPOINT, clientId);
final MqttConnectOptions connOpts = new MqttConnectOptions();
// Pass the sign-in credentials.
connOpts.setUserName(ACTIVE_MQ_USERNAME);
connOpts.setPassword(ACTIVE_MQ_PASSWORD.toCharArray());
// Create a session and subscribe to a topic filter.
client.connect(connOpts);
client.setCallback(this);
client.subscribe("+");
// Create a message.
final MqttMessage message = new MqttMessage(text.getBytes());
// Publish the message to a topic.
client.publish(topic, message);
System.out.println("Published message.");
// Wait for the message to be received.
Thread.sleep(3000L);
// Clean up the connection.
client.disconnect();
}
@Override
public void connectionLost(Throwable cause) {
System.out.println("Lost connection.");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws MqttException {
System.out.println("Received message from topic " + topic + ": " + message);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Delivered message.");
}
}
- STOMP+WSS
-
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* https://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.messaging.simp.stomp.*;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import java.lang.reflect.Type;
public class AmazonMQExampleStompWss {
// Specify the connection parameters.
private final static String DESTINATION = "/queue";
private final static String WIRE_LEVEL_ENDPOINT =
"wss://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:61619
";
private final static String ACTIVE_MQ_USERNAME = "MyUsername123
";
private final static String ACTIVE_MQ_PASSWORD = "MyPassword456
";
public static void main(String[] args) throws Exception {
final AmazonMQExampleStompWss example = new AmazonMQExampleStompWss();
final StompSession stompSession = example.connect();
System.out.println("Subscribed to a destination using session.");
example.subscribeToDestination(stompSession);
System.out.println("Sent message to session.");
example.sendMessage(stompSession);
Thread.sleep(60000);
}
private StompSession connect() throws Exception {
// Create a client.
final WebSocketClient client = new StandardWebSocketClient();
final WebSocketStompClient stompClient = new WebSocketStompClient(client);
stompClient.setMessageConverter(new StringMessageConverter());
final WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
// Create headers with authentication parameters.
final StompHeaders head = new StompHeaders();
head.add(StompHeaders.LOGIN, ACTIVE_MQ_USERNAME);
head.add(StompHeaders.PASSCODE, ACTIVE_MQ_PASSWORD);
final StompSessionHandler sessionHandler = new MySessionHandler();
// Create a connection.
return stompClient.connect(WIRE_LEVEL_ENDPOINT, headers, head,
sessionHandler).get();
}
private void subscribeToDestination(final StompSession stompSession) {
stompSession.subscribe(DESTINATION, new MyFrameHandler());
}
private void sendMessage(final StompSession stompSession) {
stompSession.send(DESTINATION, "Hello from Amazon MQ!".getBytes());
}
private static class MySessionHandler extends StompSessionHandlerAdapter {
public void afterConnected(final StompSession stompSession,
final StompHeaders stompHeaders) {
System.out.println("Connected to broker.");
}
}
private static class MyFrameHandler implements StompFrameHandler {
public Type getPayloadType(final StompHeaders headers) {
return String.class;
}
public void handleFrame(final StompHeaders stompHeaders,
final Object message) {
System.out.print("Received message from topic: " + message);
}
}
}