Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
La mensajería de publicación y suscripción (pubsub) le permite enviar y recibir mensajes sobre temas. Los componentes pueden publicar mensajes en los temas para enviar mensajes a otros componentes. A continuación, los componentes que están suscritos a ese tema pueden actuar en función de los mensajes que reciben.
nota
No puede utilizar este servicio de publicación/suscripción de IPC para publicar o suscribirse a MQTT AWS IoT Core . Para obtener más información acerca de cómo intercambiar mensajes con AWS IoT Core MQTT, consultePublicar/suscribir mensajes MQTT AWS IoT Core.
Versiones mínimas de SDK
En la siguiente tabla se enumeran las versiones mínimas de las SDK para dispositivos con AWS IoT que debe utilizar para publicar y suscribirse a los mensajes de temas locales y desde ellos.
SDK | Versión mínima |
---|---|
Versión 1.2.10 |
|
Versión 1.5.3 |
|
Versión 1.17.0 |
|
Versión 1.12.0 |
Autorización
Para utilizar la mensajería de publicación y suscripción local en un componente personalizado, debe definir políticas de autorización que permitan a su componente enviar y recibir mensajes sobre los temas. Para obtener información sobre cómo definir las políticas de autorización, consulte Autorización de los componentes para realizar operaciones de IPC.
Las políticas de autorización para mensajería de publicación y suscripción tienen las siguientes propiedades.
Identificador de servicio IPC: aws.greengrass.ipc.pubsub
Operación | Descripción | Recursos |
---|---|---|
|
Permite que un componente publique mensajes en los temas que especifique. |
Una cadena de tema, como Esta cadena de tema no admite los caracteres comodín ( |
|
Permite que un componente se suscriba a los mensajes de los temas que especifique. |
Una cadena de tema, como En el núcleo de Greengrass versión 2.6.0 y versiones posteriores, puede suscribirse a temas que contengan comodines de temas MQTT ( |
|
Permite que un componente publique y se suscriba a los mensajes de los temas que especifique. |
Una cadena de tema, como En el núcleo de Greengrass versión 2.6.0 y versiones posteriores, puede suscribirse a temas que contengan comodines de temas MQTT ( |
Ejemplos de políticas de autorización
Puede consultar el siguiente ejemplo de política de autorización con el fin de configurar las políticas de autorización para sus componentes.
ejemplo Ejemplo de política de autorización
El siguiente ejemplo de política de autorización permite a un componente publicar y suscribirse a todos los temas.
{
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.MyLocalPubSubComponent
:pubsub:1": {
"policyDescription": "Allows access to publish/subscribe to all topics.",
"operations": [
"aws.greengrass#PublishToTopic",
"aws.greengrass#SubscribeToTopic"
],
"resources": [
"*"
]
}
}
}
}
PublishToTopic
Publique un mensaje en un tema
Solicitud
Esta solicitud de operación tiene los siguientes parámetros:
topic
-
El tema en el que se va a publicar el mensaje.
publishMessage
(Python:publish_message
)-
El mensaje a publicar. Este objeto,
PublishMessage
, contiene la siguiente información. Debe especificar uno entrejsonMessage
ybinaryMessage
.jsonMessage
(Python:json_message
)-
(Opcional) Un mensaje JSON. Este objeto,
JsonMessage
, contiene la siguiente información:message
-
El mensaje JSON como un objeto.
context
-
El contexto del mensaje, como el tema en el que se publicó el mensaje.
Esta característica está disponible para la versión 2.6.0 y versiones posteriores del componente núcleo de Greengrass. En la siguiente tabla, se enumeran las versiones mínimas de SDK para dispositivos con AWS IoT que debe utilizar para acceder al contexto del mensaje.
SDK Versión mínima Versión 1.9.3
Versión 1.11.3
Versión 1.18.4
Versión 1.12.0
nota
El software AWS IoT Greengrass Core utiliza los mismos objetos de mensaje en las
SubscribeToTopic
operacionesPublishToTopic
y. El software AWS IoT Greengrass Core establece este objeto de contexto en los mensajes cuando te suscribes e ignora este objeto de contexto en los mensajes que publicas.Este objeto,
MessageContext
, contiene la siguiente información:topic
-
El tema donde se publicó el mensaje.
binaryMessage
(Python:binary_message
)-
(Opcional) Un mensaje binario. Este objeto,
BinaryMessage
, contiene la siguiente información:message
-
El mensaje binario es un blob.
context
-
El contexto del mensaje, como el tema en el que se publicó el mensaje.
Esta característica está disponible para la versión 2.6.0 y versiones posteriores del componente núcleo de Greengrass. En la siguiente tabla, se enumeran las versiones mínimas de SDK para dispositivos con AWS IoT que debe utilizar para acceder al contexto del mensaje.
SDK Versión mínima Versión 1.9.3
Versión 1.11.3
Versión 1.18.4
Versión 1.12.0
nota
El software AWS IoT Greengrass Core utiliza los mismos objetos de mensaje en las
SubscribeToTopic
operacionesPublishToTopic
y. El software AWS IoT Greengrass Core establece este objeto de contexto en los mensajes cuando te suscribes e ignora este objeto de contexto en los mensajes que publicas.Este objeto,
MessageContext
, contiene la siguiente información:topic
-
El tema donde se publicó el mensaje.
Respuesta
Esta operación no proporciona ninguna información en su respuesta.
Ejemplos
En los ejemplos siguientes, se muestra cómo llamar a esta operación en código de componente personalizado.
ejemplo Ejemplo: publicar un mensaje binario
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.model.BinaryMessage;
import software.amazon.awssdk.aws.greengrass.model.PublishMessage;
import software.amazon.awssdk.aws.greengrass.model.PublishToTopicRequest;
import software.amazon.awssdk.aws.greengrass.model.PublishToTopicResponse;
import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError;
import java.nio.charset.StandardCharsets;
public class PublishToTopicV2 {
public static void main(String[] args) {
String topic = args[0];
String message = args[1];
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
PublishToTopicV2.publishBinaryMessageToTopic(ipcClient, topic, message);
System.out.println("Successfully published to topic: " + topic);
} catch (Exception e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Exception occurred when using IPC.");
}
e.printStackTrace();
System.exit(1);
}
}
public static PublishToTopicResponse publishBinaryMessageToTopic(
GreengrassCoreIPCClientV2 ipcClient, String topic, String message) throws InterruptedException {
BinaryMessage binaryMessage =
new BinaryMessage().withMessage(message.getBytes(StandardCharsets.UTF_8));
PublishMessage publishMessage = new PublishMessage().withBinaryMessage(binaryMessage);
PublishToTopicRequest publishToTopicRequest =
new PublishToTopicRequest().withTopic(topic).withPublishMessage(publishMessage);
return ipcClient.publishToTopic(publishToTopicRequest);
}
}
SubscribeToTopic
Suscripción a los mensajes sobre un tema.
Esta es una operación de suscripción en la que se suscribe a un flujo de mensajes de eventos. Para usar esta operación, defina un identificador de respuesta de flujo con funciones que gestionen los mensajes de eventos, los errores y el cierre del flujo. Para obtener más información, consulte Suscripción a los flujos de eventos de IPC.
Tipo de mensaje del evento: SubscriptionResponseMessage
Solicitud
Esta solicitud de operación tiene los siguientes parámetros:
topic
-
El tema al que se suscribe.
nota
En el núcleo de Greengrass versión 2.6.0 y versiones posteriores, este tema admite los comodines (
#
y+
) de los temas MQTT. receiveMode
(Python:receive_mode
)-
(Opcional) El comportamiento que especifica si el componente recibe mensajes de sí mismo. Puede cambiar este comportamiento para permitir que un componente actúe sobre sus propios mensajes. El comportamiento predeterminado depende de si el tema contiene un comodín MQTT. Puede elegir entre las siguientes opciones:
-
RECEIVE_ALL_MESSAGES
: reciba todos los mensajes que coincidan con el tema, incluidos los mensajes del componente al que se suscribe.Este modo es la opción por defecto cuando se suscribe a un tema que no contiene un comodín MQTT.
-
RECEIVE_MESSAGES_FROM_OTHERS
: reciba todos los mensajes que coincidan con el tema, excepto los del componente al que se suscribe.Este modo es la opción por defecto cuando se suscribe a un tema que contiene un comodín MQTT.
Esta característica está disponible para la versión 2.6.0 y versiones posteriores del componente núcleo de Greengrass. En la siguiente tabla se enumeran las versiones mínimas del SDK para dispositivos con AWS IoT que debe utilizar para configurar el modo de recepción.
SDK Versión mínima Versión 1.9.3
Versión 1.11.3
Versión 1.18.4
Versión 1.12.0
-
Respuesta
Esta respuesta de operación contiene la siguiente información:
messages
-
El flujo de mensajes. Este objeto,
SubscriptionResponseMessage
, contiene la siguiente información. Cada mensaje contienejsonMessage
obinaryMessage
.jsonMessage
(Python:json_message
)-
(Opcional) Un mensaje JSON. Este objeto,
JsonMessage
, contiene la siguiente información:message
-
El mensaje JSON como un objeto.
context
-
El contexto del mensaje, como el tema en el que se publicó el mensaje.
Esta característica está disponible para la versión 2.6.0 y versiones posteriores del componente núcleo de Greengrass. En la siguiente tabla, se enumeran las versiones mínimas de SDK para dispositivos con AWS IoT que debe utilizar para acceder al contexto del mensaje.
SDK Versión mínima Versión 1.9.3
Versión 1.11.3
Versión 1.18.4
Versión 1.12.0
nota
El software AWS IoT Greengrass Core utiliza los mismos objetos de mensaje en las
SubscribeToTopic
operacionesPublishToTopic
y. El software AWS IoT Greengrass Core establece este objeto de contexto en los mensajes cuando te suscribes e ignora este objeto de contexto en los mensajes que publicas.Este objeto,
MessageContext
, contiene la siguiente información:topic
-
El tema donde se publicó el mensaje.
binaryMessage
(Python:binary_message
)-
(Opcional) Un mensaje binario. Este objeto,
BinaryMessage
, contiene la siguiente información:message
-
El mensaje binario es un blob.
context
-
El contexto del mensaje, como el tema en el que se publicó el mensaje.
Esta característica está disponible para la versión 2.6.0 y versiones posteriores del componente núcleo de Greengrass. En la siguiente tabla, se enumeran las versiones mínimas de SDK para dispositivos con AWS IoT que debe utilizar para acceder al contexto del mensaje.
SDK Versión mínima Versión 1.9.3
Versión 1.11.3
Versión 1.18.4
Versión 1.12.0
nota
El software AWS IoT Greengrass Core utiliza los mismos objetos de mensaje en las
SubscribeToTopic
operacionesPublishToTopic
y. El software AWS IoT Greengrass Core establece este objeto de contexto en los mensajes cuando te suscribes e ignora este objeto de contexto en los mensajes que publicas.Este objeto,
MessageContext
, contiene la siguiente información:topic
-
El tema donde se publicó el mensaje.
topicName
(Python:topic_name
)-
El tema en el que se publicó el mensaje.
nota
En la actualidad, esta propiedad no se utiliza. En el núcleo de Greengrass versión 2.6.0 y versiones posteriores, puede obtener el valor
(jsonMessage|binaryMessage).context.topic
de unSubscriptionResponseMessage
para obtener el tema en el que se publicó el mensaje.
Ejemplos
En los ejemplos siguientes, se muestra cómo llamar a esta operación en código de componente personalizado.
ejemplo Ejemplo: suscribirse a mensajería de publicación y suscripción local
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.*;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
public class SubscribeToTopicV2 {
public static void main(String[] args) {
String topic = args[0];
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic);
GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse,
SubscribeToTopicResponseHandler> response =
ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent,
Optional.of(SubscribeToTopicV2::onStreamError),
Optional.of(SubscribeToTopicV2::onStreamClosed));
SubscribeToTopicResponseHandler responseHandler = response.getHandler();
System.out.println("Successfully subscribed to topic: " + topic);
// Keep the main thread alive, or the process will exit.
try {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
}
// To stop subscribing, close the stream.
responseHandler.closeStream();
} catch (Exception e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Exception occurred when using IPC.");
}
e.printStackTrace();
System.exit(1);
}
}
public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) {
try {
BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage();
String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8);
String topic = binaryMessage.getContext().getTopic();
System.out.printf("Received new message on topic %s: %s%n", topic, message);
} catch (Exception e) {
System.err.println("Exception occurred while processing subscription response " +
"message.");
e.printStackTrace();
}
}
public static boolean onStreamError(Throwable error) {
System.err.println("Received a stream error.");
error.printStackTrace();
return false; // Return true to close stream, false to keep stream open.
}
public static void onStreamClosed() {
System.out.println("Subscribe to topic stream closed.");
}
}
Ejemplos
Utilice los siguientes ejemplos para obtener información sobre cómo utilizar el servicio de publicación/suscripción de IPC en sus componentes.
La siguiente receta de ejemplo permite que el componente publique en todos los temas.
El siguiente ejemplo de aplicación Java demuestra cómo utilizar el servicio IPC de publicación/suscripción para publicar mensajes en otros componentes.
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0 */
package com.example.ipc.pubsub;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
import software.amazon.awssdk.aws.greengrass.model.*;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class PubSubPublisher {
public static void main(String[] args) {
String message = "Hello from the pub/sub publisher (Java).";
String topic = "test/topic/java";
try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) {
GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection);
while (true) {
PublishToTopicRequest publishRequest = new PublishToTopicRequest();
PublishMessage publishMessage = new PublishMessage();
BinaryMessage binaryMessage = new BinaryMessage();
binaryMessage.setMessage(message.getBytes(StandardCharsets.UTF_8));
publishMessage.setBinaryMessage(binaryMessage);
publishRequest.setPublishMessage(publishMessage);
publishRequest.setTopic(topic);
CompletableFuture<PublishToTopicResponse> futureResponse = ipcClient
.publishToTopic(publishRequest, Optional.empty()).getResponse();
try {
futureResponse.get(10, TimeUnit.SECONDS);
System.out.println("Successfully published to topic: " + topic);
} catch (TimeoutException e) {
System.err.println("Timeout occurred while publishing to topic: " + topic);
} catch (ExecutionException e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Execution exception while publishing to topic: " + topic);
}
throw e;
}
Thread.sleep(5000);
}
} catch (InterruptedException e) {
System.out.println("Publisher interrupted.");
} catch (Exception e) {
System.err.println("Exception occurred when using IPC.");
e.printStackTrace();
System.exit(1);
}
}
}
La siguiente receta de ejemplo permite que el componente se suscriba a todos los temas.
El siguiente ejemplo de aplicación Java muestra cómo utilizar el servicio IPC de publicación/suscripción para suscribirse a los mensajes de otros componentes.
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0 */
package com.example.ipc.pubsub;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicRequest;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicResponse;
import software.amazon.awssdk.aws.greengrass.model.SubscriptionResponseMessage;
import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class PubSubSubscriber {
public static void main(String[] args) {
String topic = "test/topic/java";
try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) {
GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection);
SubscribeToTopicRequest subscribeRequest = new SubscribeToTopicRequest();
subscribeRequest.setTopic(topic);
SubscribeToTopicResponseHandler operationResponseHandler = ipcClient
.subscribeToTopic(subscribeRequest, Optional.of(new SubscribeResponseHandler()));
CompletableFuture<SubscribeToTopicResponse> futureResponse = operationResponseHandler.getResponse();
try {
futureResponse.get(10, TimeUnit.SECONDS);
System.out.println("Successfully subscribed to topic: " + topic);
} catch (TimeoutException e) {
System.err.println("Timeout occurred while subscribing to topic: " + topic);
throw e;
} catch (ExecutionException e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while subscribing to topic: " + topic);
} else {
System.err.println("Execution exception while subscribing to topic: " + topic);
}
throw e;
}
// Keep the main thread alive, or the process will exit.
try {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
}
} catch (Exception e) {
System.err.println("Exception occurred when using IPC.");
e.printStackTrace();
System.exit(1);
}
}
private static class SubscribeResponseHandler implements StreamResponseHandler<SubscriptionResponseMessage> {
@Override
public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) {
try {
String message = new String(subscriptionResponseMessage.getBinaryMessage()
.getMessage(), StandardCharsets.UTF_8);
System.out.println("Received new message: " + message);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public boolean onStreamError(Throwable error) {
System.err.println("Received a stream error.");
error.printStackTrace();
return false; // Return true to close stream, false to keep stream open.
}
@Override
public void onStreamClosed() {
System.out.println("Subscribe to topic stream closed.");
}
}
}
La siguiente receta de ejemplo permite que el componente publique en todos los temas.
El siguiente ejemplo de aplicación de Python demuestra cómo utilizar el servicio IPC de publicación/suscripción para publicar mensajes en otros componentes.
import concurrent.futures
import sys
import time
import traceback
import awsiot.greengrasscoreipc
from awsiot.greengrasscoreipc.model import (
PublishToTopicRequest,
PublishMessage,
BinaryMessage,
UnauthorizedError
)
topic = "test/topic/python"
message = "Hello from the pub/sub publisher (Python)."
TIMEOUT = 10
try:
ipc_client = awsiot.greengrasscoreipc.connect()
while True:
request = PublishToTopicRequest()
request.topic = topic
publish_message = PublishMessage()
publish_message.binary_message = BinaryMessage()
publish_message.binary_message.message = bytes(message, "utf-8")
request.publish_message = publish_message
operation = ipc_client.new_publish_to_topic()
operation.activate(request)
future_response = operation.get_response()
try:
future_response.result(TIMEOUT)
print('Successfully published to topic: ' + topic)
except concurrent.futures.TimeoutError:
print('Timeout occurred while publishing to topic: ' + topic, file=sys.stderr)
except UnauthorizedError as e:
print('Unauthorized error while publishing to topic: ' + topic, file=sys.stderr)
raise e
except Exception as e:
print('Exception while publishing to topic: ' + topic, file=sys.stderr)
raise e
time.sleep(5)
except InterruptedError:
print('Publisher interrupted.')
except Exception:
print('Exception occurred when using IPC.', file=sys.stderr)
traceback.print_exc()
exit(1)
La siguiente receta de ejemplo permite que el componente se suscriba a todos los temas.
El siguiente ejemplo de aplicación de Python demuestra cómo utilizar el servicio IPC de publicación/suscripción para suscribirse a los mensajes de otros componentes.
import concurrent.futures
import sys
import time
import traceback
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
SubscribeToTopicRequest,
SubscriptionResponseMessage,
UnauthorizedError
)
topic = "test/topic/python"
TIMEOUT = 10
class StreamHandler(client.SubscribeToTopicStreamHandler):
def __init__(self):
super().__init__()
def on_stream_event(self, event: SubscriptionResponseMessage) -> None:
try:
message = str(event.binary_message.message, "utf-8")
print("Received new message: " + message)
except:
traceback.print_exc()
def on_stream_error(self, error: Exception) -> bool:
print("Received a stream error.", file=sys.stderr)
traceback.print_exc()
return False # Return True to close stream, False to keep stream open.
def on_stream_closed(self) -> None:
print('Subscribe to topic stream closed.')
try:
ipc_client = awsiot.greengrasscoreipc.connect()
request = SubscribeToTopicRequest()
request.topic = topic
handler = StreamHandler()
operation = ipc_client.new_subscribe_to_topic(handler)
operation.activate(request)
future_response = operation.get_response()
try:
future_response.result(TIMEOUT)
print('Successfully subscribed to topic: ' + topic)
except concurrent.futures.TimeoutError as e:
print('Timeout occurred while subscribing to topic: ' + topic, file=sys.stderr)
raise e
except UnauthorizedError as e:
print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr)
raise e
except Exception as e:
print('Exception while subscribing to topic: ' + topic, file=sys.stderr)
raise e
# Keep the main thread alive, or the process will exit.
try:
while True:
time.sleep(10)
except InterruptedError:
print('Subscribe interrupted.')
except Exception:
print('Exception occurred when using IPC.', file=sys.stderr)
traceback.print_exc()
exit(1)
La siguiente receta de ejemplo permite que el componente publique en todos los temas.
El siguiente ejemplo de aplicación de C++ demuestra cómo utilizar el servicio IPC de publicación/suscripción para publicar mensajes en otros componentes.
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String message("Hello from the pub/sub publisher (C++).");
String topic("test/topic/cpp");
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
while (true) {
PublishToTopicRequest request;
Vector<uint8_t> messageData({message.begin(), message.end()});
BinaryMessage binaryMessage;
binaryMessage.SetMessage(messageData);
PublishMessage publishMessage;
publishMessage.SetBinaryMessage(binaryMessage);
request.SetTopic(topic);
request.SetPublishMessage(publishMessage);
auto operation = ipcClient.NewPublishToTopic();
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully published to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to publish to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
std::this_thread::sleep_for(std::chrono::seconds(5));
}
return 0;
}
La siguiente receta de ejemplo permite que el componente se suscriba a todos los temas.
El siguiente ejemplo de aplicación de C++ demuestra cómo utilizar el servicio IPC de publicación/suscripción para suscribirse a los mensajes de otros componentes.
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class SubscribeResponseHandler : public SubscribeToTopicStreamHandler {
public:
virtual ~SubscribeResponseHandler() {}
private:
void OnStreamEvent(SubscriptionResponseMessage *response) override {
auto jsonMessage = response->GetJsonMessage();
if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) {
auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable();
std::cout << "Received new message: " << messageString << std::endl;
} else {
auto binaryMessage = response->GetBinaryMessage();
if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) {
auto messageBytes = binaryMessage.value().GetMessage().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
std::cout << "Received new message: " << messageString << std::endl;
}
}
}
bool OnStreamError(OperationError *error) override {
std::cout << "Received an operation error: ";
if (error->GetMessage().has_value()) {
std::cout << error->GetMessage().value();
}
std::cout << std::endl;
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
std::cout << "Subscribe to topic stream closed." << std::endl;
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String topic("test/topic/cpp");
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
SubscribeToTopicRequest request;
request.SetTopic(topic);
auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToTopic(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully subscribed to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to subscribe to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}