Seleccione sus preferencias de cookies

Usamos cookies esenciales y herramientas similares que son necesarias para proporcionar nuestro sitio y nuestros servicios. Usamos cookies de rendimiento para recopilar estadísticas anónimas para que podamos entender cómo los clientes usan nuestro sitio y hacer mejoras. Las cookies esenciales no se pueden desactivar, pero puede hacer clic en “Personalizar” o “Rechazar” para rechazar las cookies de rendimiento.

Si está de acuerdo, AWS y los terceros aprobados también utilizarán cookies para proporcionar características útiles del sitio, recordar sus preferencias y mostrar contenido relevante, incluida publicidad relevante. Para aceptar o rechazar todas las cookies no esenciales, haga clic en “Aceptar” o “Rechazar”. Para elegir opciones más detalladas, haga clic en “Personalizar”.

Publicar/suscribir mensajes locales

Modo de enfoque
Publicar/suscribir mensajes locales - AWS IoT Greengrass

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

La mensajería de publicación y suscripción (pubsub) le permite enviar y recibir mensajes sobre temas. Los componentes pueden publicar mensajes en los temas para enviar mensajes a otros componentes. A continuación, los componentes que están suscritos a ese tema pueden actuar en función de los mensajes que reciben.

nota

No puede utilizar este servicio de publicación/suscripción de IPC para publicar o suscribirse a MQTT AWS IoT Core . Para obtener más información acerca de cómo intercambiar mensajes con AWS IoT Core MQTT, consultePublicar/suscribir mensajes MQTT AWS IoT Core.

Versiones mínimas de SDK

En la siguiente tabla se enumeran las versiones mínimas de las SDK para dispositivos con AWS IoT que debe utilizar para publicar y suscribirse a los mensajes de temas locales y desde ellos.

Autorización

Para utilizar la mensajería de publicación y suscripción local en un componente personalizado, debe definir políticas de autorización que permitan a su componente enviar y recibir mensajes sobre los temas. Para obtener información sobre cómo definir las políticas de autorización, consulte Autorización de los componentes para realizar operaciones de IPC.

Las políticas de autorización para mensajería de publicación y suscripción tienen las siguientes propiedades.

Identificador de servicio IPC: aws.greengrass.ipc.pubsub

Operación Descripción Recursos

aws.greengrass#PublishToTopic

Permite que un componente publique mensajes en los temas que especifique.

Una cadena de tema, como test/topic. Use un * para hacer coincidir cualquier combinación de caracteres de un tema.

Esta cadena de tema no admite los caracteres comodín (# y +) de los temas MQTT.

aws.greengrass#SubscribeToTopic

Permite que un componente se suscriba a los mensajes de los temas que especifique.

Una cadena de tema, como test/topic. Use un * para hacer coincidir cualquier combinación de caracteres de un tema.

En el núcleo de Greengrass versión 2.6.0 y versiones posteriores, puede suscribirse a temas que contengan comodines de temas MQTT (# y +). Esta cadena de tema admite los comodines de los temas MQTT como caracteres literales. Por ejemplo, si la política de autorización de un componente permite el acceso a test/topic/#, el componente se puede suscribir a test/topic/#, pero no se puede suscribir a test/topic/filter.

*

Permite que un componente publique y se suscriba a los mensajes de los temas que especifique.

Una cadena de tema, como test/topic. Use un * para hacer coincidir cualquier combinación de caracteres de un tema.

En el núcleo de Greengrass versión 2.6.0 y versiones posteriores, puede suscribirse a temas que contengan comodines de temas MQTT (# y +). Esta cadena de tema admite los comodines de los temas MQTT como caracteres literales. Por ejemplo, si la política de autorización de un componente permite el acceso a test/topic/#, el componente se puede suscribir a test/topic/#, pero no se puede suscribir a test/topic/filter.

Ejemplos de políticas de autorización

Puede consultar el siguiente ejemplo de política de autorización con el fin de configurar las políticas de autorización para sus componentes.

ejemplo Ejemplo de política de autorización

El siguiente ejemplo de política de autorización permite a un componente publicar y suscribirse a todos los temas.

{ "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.MyLocalPubSubComponent:pubsub:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToTopic", "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } }

PublishToTopic

Publique un mensaje en un tema

Solicitud

Esta solicitud de operación tiene los siguientes parámetros:

topic

El tema en el que se va a publicar el mensaje.

publishMessage (Python: publish_message)

El mensaje a publicar. Este objeto, PublishMessage, contiene la siguiente información. Debe especificar uno entre jsonMessage y binaryMessage.

jsonMessage (Python: json_message)

(Opcional) Un mensaje JSON. Este objeto, JsonMessage, contiene la siguiente información:

message

El mensaje JSON como un objeto.

context

El contexto del mensaje, como el tema en el que se publicó el mensaje.

Esta característica está disponible para la versión 2.6.0 y versiones posteriores del componente núcleo de Greengrass. En la siguiente tabla, se enumeran las versiones mínimas de SDK para dispositivos con AWS IoT que debe utilizar para acceder al contexto del mensaje.

nota

El software AWS IoT Greengrass Core utiliza los mismos objetos de mensaje en las SubscribeToTopic operaciones PublishToTopic y. El software AWS IoT Greengrass Core establece este objeto de contexto en los mensajes cuando te suscribes e ignora este objeto de contexto en los mensajes que publicas.

Este objeto, MessageContext, contiene la siguiente información:

topic

El tema donde se publicó el mensaje.

binaryMessage (Python: binary_message)

(Opcional) Un mensaje binario. Este objeto, BinaryMessage, contiene la siguiente información:

message

El mensaje binario es un blob.

context

El contexto del mensaje, como el tema en el que se publicó el mensaje.

Esta característica está disponible para la versión 2.6.0 y versiones posteriores del componente núcleo de Greengrass. En la siguiente tabla, se enumeran las versiones mínimas de SDK para dispositivos con AWS IoT que debe utilizar para acceder al contexto del mensaje.

nota

El software AWS IoT Greengrass Core utiliza los mismos objetos de mensaje en las SubscribeToTopic operaciones PublishToTopic y. El software AWS IoT Greengrass Core establece este objeto de contexto en los mensajes cuando te suscribes e ignora este objeto de contexto en los mensajes que publicas.

Este objeto, MessageContext, contiene la siguiente información:

topic

El tema donde se publicó el mensaje.

Respuesta

Esta operación no proporciona ninguna información en su respuesta.

Ejemplos

En los ejemplos siguientes, se muestra cómo llamar a esta operación en código de componente personalizado.

Java (IPC client V2)
ejemplo Ejemplo: publicar un mensaje binario
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.model.BinaryMessage; import software.amazon.awssdk.aws.greengrass.model.PublishMessage; import software.amazon.awssdk.aws.greengrass.model.PublishToTopicRequest; import software.amazon.awssdk.aws.greengrass.model.PublishToTopicResponse; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import java.nio.charset.StandardCharsets; public class PublishToTopicV2 { public static void main(String[] args) { String topic = args[0]; String message = args[1]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { PublishToTopicV2.publishBinaryMessageToTopic(ipcClient, topic, message); System.out.println("Successfully published to topic: " + topic); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static PublishToTopicResponse publishBinaryMessageToTopic( GreengrassCoreIPCClientV2 ipcClient, String topic, String message) throws InterruptedException { BinaryMessage binaryMessage = new BinaryMessage().withMessage(message.getBytes(StandardCharsets.UTF_8)); PublishMessage publishMessage = new PublishMessage().withBinaryMessage(binaryMessage); PublishToTopicRequest publishToTopicRequest = new PublishToTopicRequest().withTopic(topic).withPublishMessage(publishMessage); return ipcClient.publishToTopic(publishToTopicRequest); } }
Python (IPC client V2)
ejemplo Ejemplo: publicar un mensaje binario
import sys import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( PublishMessage, BinaryMessage ) def main(): args = sys.argv[1:] topic = args[0] message = args[1] try: ipc_client = GreengrassCoreIPCClientV2() publish_binary_message_to_topic(ipc_client, topic, message) print('Successfully published to topic: ' + topic) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def publish_binary_message_to_topic(ipc_client, topic, message): binary_message = BinaryMessage(message=bytes(message, 'utf-8')) publish_message = PublishMessage(binary_message=binary_message) return ipc_client.publish_to_topic(topic=topic, publish_message=publish_message) if __name__ == '__main__': main()
C++
ejemplo Ejemplo: publicar un mensaje binario
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); String message("Hello, World!"); int timeout = 10; PublishToTopicRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); BinaryMessage binaryMessage; binaryMessage.SetMessage(messageData); PublishMessage publishMessage; publishMessage.SetBinaryMessage(binaryMessage); request.SetTopic(topic); request.SetPublishMessage(publishMessage); auto operation = ipcClient.NewPublishToTopic(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } } return 0; }
JavaScript
ejemplo Ejemplo: publicar un mensaje binario
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {BinaryMessage, PublishMessage, PublishToTopicRequest} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; class PublishToTopic { private ipcClient : greengrasscoreipc.Client private readonly topic : string; private readonly messageString : string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.messageString = "<define_your_message_string>"; this.publishToTopic().then(r => console.log("Started workflow")); } private async publishToTopic() { try { this.ipcClient = await getIpcClient(); const binaryMessage : BinaryMessage = { message: this.messageString } const publishMessage : PublishMessage = { binaryMessage: binaryMessage } const request : PublishToTopicRequest = { topic: this.topic, publishMessage: publishMessage } this.ipcClient.publishToTopic(request).finally(() => console.log(`Published message ${publishMessage.binaryMessage?.message} to topic`)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const publishToTopic = new PublishToTopic();
ejemplo Ejemplo: publicar un mensaje binario
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.model.BinaryMessage; import software.amazon.awssdk.aws.greengrass.model.PublishMessage; import software.amazon.awssdk.aws.greengrass.model.PublishToTopicRequest; import software.amazon.awssdk.aws.greengrass.model.PublishToTopicResponse; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import java.nio.charset.StandardCharsets; public class PublishToTopicV2 { public static void main(String[] args) { String topic = args[0]; String message = args[1]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { PublishToTopicV2.publishBinaryMessageToTopic(ipcClient, topic, message); System.out.println("Successfully published to topic: " + topic); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static PublishToTopicResponse publishBinaryMessageToTopic( GreengrassCoreIPCClientV2 ipcClient, String topic, String message) throws InterruptedException { BinaryMessage binaryMessage = new BinaryMessage().withMessage(message.getBytes(StandardCharsets.UTF_8)); PublishMessage publishMessage = new PublishMessage().withBinaryMessage(binaryMessage); PublishToTopicRequest publishToTopicRequest = new PublishToTopicRequest().withTopic(topic).withPublishMessage(publishMessage); return ipcClient.publishToTopic(publishToTopicRequest); } }

SubscribeToTopic

Suscripción a los mensajes sobre un tema.

Esta es una operación de suscripción en la que se suscribe a un flujo de mensajes de eventos. Para usar esta operación, defina un identificador de respuesta de flujo con funciones que gestionen los mensajes de eventos, los errores y el cierre del flujo. Para obtener más información, consulte Suscripción a los flujos de eventos de IPC.

Tipo de mensaje del evento: SubscriptionResponseMessage

Solicitud

Esta solicitud de operación tiene los siguientes parámetros:

topic

El tema al que se suscribe.

nota

En el núcleo de Greengrass versión 2.6.0 y versiones posteriores, este tema admite los comodines (# y +) de los temas MQTT.

receiveMode (Python: receive_mode)

(Opcional) El comportamiento que especifica si el componente recibe mensajes de sí mismo. Puede cambiar este comportamiento para permitir que un componente actúe sobre sus propios mensajes. El comportamiento predeterminado depende de si el tema contiene un comodín MQTT. Puede elegir entre las siguientes opciones:

  • RECEIVE_ALL_MESSAGES: reciba todos los mensajes que coincidan con el tema, incluidos los mensajes del componente al que se suscribe.

    Este modo es la opción por defecto cuando se suscribe a un tema que no contiene un comodín MQTT.

  • RECEIVE_MESSAGES_FROM_OTHERS: reciba todos los mensajes que coincidan con el tema, excepto los del componente al que se suscribe.

    Este modo es la opción por defecto cuando se suscribe a un tema que contiene un comodín MQTT.

Esta característica está disponible para la versión 2.6.0 y versiones posteriores del componente núcleo de Greengrass. En la siguiente tabla se enumeran las versiones mínimas del SDK para dispositivos con AWS IoT que debe utilizar para configurar el modo de recepción.

Respuesta

Esta respuesta de operación contiene la siguiente información:

messages

El flujo de mensajes. Este objeto, SubscriptionResponseMessage, contiene la siguiente información. Cada mensaje contiene jsonMessage o binaryMessage.

jsonMessage (Python: json_message)

(Opcional) Un mensaje JSON. Este objeto, JsonMessage, contiene la siguiente información:

message

El mensaje JSON como un objeto.

context

El contexto del mensaje, como el tema en el que se publicó el mensaje.

Esta característica está disponible para la versión 2.6.0 y versiones posteriores del componente núcleo de Greengrass. En la siguiente tabla, se enumeran las versiones mínimas de SDK para dispositivos con AWS IoT que debe utilizar para acceder al contexto del mensaje.

nota

El software AWS IoT Greengrass Core utiliza los mismos objetos de mensaje en las SubscribeToTopic operaciones PublishToTopic y. El software AWS IoT Greengrass Core establece este objeto de contexto en los mensajes cuando te suscribes e ignora este objeto de contexto en los mensajes que publicas.

Este objeto, MessageContext, contiene la siguiente información:

topic

El tema donde se publicó el mensaje.

binaryMessage (Python: binary_message)

(Opcional) Un mensaje binario. Este objeto, BinaryMessage, contiene la siguiente información:

message

El mensaje binario es un blob.

context

El contexto del mensaje, como el tema en el que se publicó el mensaje.

Esta característica está disponible para la versión 2.6.0 y versiones posteriores del componente núcleo de Greengrass. En la siguiente tabla, se enumeran las versiones mínimas de SDK para dispositivos con AWS IoT que debe utilizar para acceder al contexto del mensaje.

nota

El software AWS IoT Greengrass Core utiliza los mismos objetos de mensaje en las SubscribeToTopic operaciones PublishToTopic y. El software AWS IoT Greengrass Core establece este objeto de contexto en los mensajes cuando te suscribes e ignora este objeto de contexto en los mensajes que publicas.

Este objeto, MessageContext, contiene la siguiente información:

topic

El tema donde se publicó el mensaje.

topicName (Python: topic_name)

El tema en el que se publicó el mensaje.

nota

En la actualidad, esta propiedad no se utiliza. En el núcleo de Greengrass versión 2.6.0 y versiones posteriores, puede obtener el valor (jsonMessage|binaryMessage).context.topic de un SubscriptionResponseMessage para obtener el tema en el que se publicó el mensaje.

Ejemplos

En los ejemplos siguientes, se muestra cómo llamar a esta operación en código de componente personalizado.

Java (IPC client V2)
ejemplo Ejemplo: suscribirse a mensajería de publicación y suscripción local
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import java.nio.charset.StandardCharsets; import java.util.Optional; public class SubscribeToTopicV2 { public static void main(String[] args) { String topic = args[0]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse, SubscribeToTopicResponseHandler> response = ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent, Optional.of(SubscribeToTopicV2::onStreamError), Optional.of(SubscribeToTopicV2::onStreamClosed)); SubscribeToTopicResponseHandler responseHandler = response.getHandler(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage(); String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8); String topic = binaryMessage.getContext().getTopic(); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } public static boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } public static void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } }
Python (IPC client V2)
ejemplo Ejemplo: suscribirse a mensajería de publicación y suscripción local
import sys import time import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( SubscriptionResponseMessage, UnauthorizedError ) def main(): args = sys.argv[1:] topic = args[0] try: ipc_client = GreengrassCoreIPCClientV2() # Subscription operations return a tuple with the response and the operation. _, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed) print('Successfully subscribed to topic: ' + topic) # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') # To stop subscribing, close the stream. operation.close() except UnauthorizedError: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) traceback.print_exc() exit(1) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def on_stream_event(event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, 'utf-8') topic = event.binary_message.context.topic print('Received new message on topic %s: %s' % (topic, message)) except: traceback.print_exc() def on_stream_error(error: Exception) -> bool: print('Received a stream error.', file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed() -> None: print('Subscribe to topic stream closed.') if __name__ == '__main__': main()
C++
ejemplo Ejemplo: suscribirse a mensajería de publicación y suscripción local
#include <iostream> #include </crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); // Handle JSON message. } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); // Handle binary message. } } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); int timeout = 10; SubscribeToTopicRequest request; request.SetTopic(topic); //SubscribeResponseHandler streamHandler; auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
JavaScript
ejemplo Ejemplo: suscribirse a mensajería de publicación y suscripción local
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc"; class SubscribeToTopic { private ipcClient : greengrasscoreipc.Client private readonly topic : string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.subscribeToTopic().then(r => console.log("Started workflow")); } private async subscribeToTopic() { try { this.ipcClient = await getIpcClient(); const subscribeToTopicRequest : SubscribeToTopicRequest = { topic: this.topic, } const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options streamingOperation.on("message", (message: SubscriptionResponseMessage) => { // parse the message depending on your use cases, e.g. if(message.binaryMessage && message.binaryMessage.message) { const receivedMessage = message.binaryMessage?.message.toString(); } }); streamingOperation.on("streamError", (error : RpcError) => { // define your own error handling logic }) streamingOperation.on("ended", () => { // define your own logic }) await streamingOperation.activate(); // Keep the main thread alive, or the process will exit. await new Promise((resolve) => setTimeout(resolve, 10000)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const subscribeToTopic = new SubscribeToTopic();
ejemplo Ejemplo: suscribirse a mensajería de publicación y suscripción local
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import java.nio.charset.StandardCharsets; import java.util.Optional; public class SubscribeToTopicV2 { public static void main(String[] args) { String topic = args[0]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse, SubscribeToTopicResponseHandler> response = ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent, Optional.of(SubscribeToTopicV2::onStreamError), Optional.of(SubscribeToTopicV2::onStreamClosed)); SubscribeToTopicResponseHandler responseHandler = response.getHandler(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage(); String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8); String topic = binaryMessage.getContext().getTopic(); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } public static boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } public static void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } }

Ejemplos

Utilice los siguientes ejemplos para obtener información sobre cómo utilizar el servicio de publicación/suscripción de IPC en sus componentes.

La siguiente receta de ejemplo permite que el componente publique en todos los temas.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherJava", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherJava:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "java -jar {artifacts:path}/PubSubPublisher.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubPublisherJava ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: 'com.example.PubSubPublisherJava:pubsub:1': policyDescription: Allows access to publish to all topics. operations: - 'aws.greengrass#PublishToTopic' resources: - '*' Manifests: - Lifecycle: Run: |- java -jar {artifacts:path}/PubSubPublisher.jar

El siguiente ejemplo de aplicación Java demuestra cómo utilizar el servicio IPC de publicación/suscripción para publicar mensajes en otros componentes.

/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.ipc.pubsub; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.model.*; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PubSubPublisher { public static void main(String[] args) { String message = "Hello from the pub/sub publisher (Java)."; String topic = "test/topic/java"; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); while (true) { PublishToTopicRequest publishRequest = new PublishToTopicRequest(); PublishMessage publishMessage = new PublishMessage(); BinaryMessage binaryMessage = new BinaryMessage(); binaryMessage.setMessage(message.getBytes(StandardCharsets.UTF_8)); publishMessage.setBinaryMessage(binaryMessage); publishRequest.setPublishMessage(publishMessage); publishRequest.setTopic(topic); CompletableFuture<PublishToTopicResponse> futureResponse = ipcClient .publishToTopic(publishRequest, Optional.empty()).getResponse(); try { futureResponse.get(10, TimeUnit.SECONDS); System.out.println("Successfully published to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while publishing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Execution exception while publishing to topic: " + topic); } throw e; } Thread.sleep(5000); } } catch (InterruptedException e) { System.out.println("Publisher interrupted."); } catch (Exception e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } }

La siguiente receta de ejemplo permite que el componente publique en todos los temas.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherJava", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherJava:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "java -jar {artifacts:path}/PubSubPublisher.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubPublisherJava ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: 'com.example.PubSubPublisherJava:pubsub:1': policyDescription: Allows access to publish to all topics. operations: - 'aws.greengrass#PublishToTopic' resources: - '*' Manifests: - Lifecycle: Run: |- java -jar {artifacts:path}/PubSubPublisher.jar
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherJava", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherJava:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "java -jar {artifacts:path}/PubSubPublisher.jar" } } ] }

El siguiente ejemplo de aplicación Java demuestra cómo utilizar el servicio IPC de publicación/suscripción para publicar mensajes en otros componentes.

/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.ipc.pubsub; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.model.*; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PubSubPublisher { public static void main(String[] args) { String message = "Hello from the pub/sub publisher (Java)."; String topic = "test/topic/java"; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); while (true) { PublishToTopicRequest publishRequest = new PublishToTopicRequest(); PublishMessage publishMessage = new PublishMessage(); BinaryMessage binaryMessage = new BinaryMessage(); binaryMessage.setMessage(message.getBytes(StandardCharsets.UTF_8)); publishMessage.setBinaryMessage(binaryMessage); publishRequest.setPublishMessage(publishMessage); publishRequest.setTopic(topic); CompletableFuture<PublishToTopicResponse> futureResponse = ipcClient .publishToTopic(publishRequest, Optional.empty()).getResponse(); try { futureResponse.get(10, TimeUnit.SECONDS); System.out.println("Successfully published to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while publishing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Execution exception while publishing to topic: " + topic); } throw e; } Thread.sleep(5000); } } catch (InterruptedException e) { System.out.println("Publisher interrupted."); } catch (Exception e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } }

La siguiente receta de ejemplo permite que el componente se suscriba a todos los temas.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberJava", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberJava:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "java -jar {artifacts:path}/PubSubSubscriber.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubSubscriberJava ComponentVersion: '1.0.0' ComponentDescription: A component that subscribes to messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: 'com.example.PubSubSubscriberJava:pubsub:1': policyDescription: Allows access to subscribe to all topics. operations: - 'aws.greengrass#SubscribeToTopic' resources: - '*' Manifests: - Lifecycle: Run: |- java -jar {artifacts:path}/PubSubSubscriber.jar

El siguiente ejemplo de aplicación Java muestra cómo utilizar el servicio IPC de publicación/suscripción para suscribirse a los mensajes de otros componentes.

/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.ipc.pubsub; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicRequest; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicResponse; import software.amazon.awssdk.aws.greengrass.model.SubscriptionResponseMessage; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PubSubSubscriber { public static void main(String[] args) { String topic = "test/topic/java"; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); SubscribeToTopicRequest subscribeRequest = new SubscribeToTopicRequest(); subscribeRequest.setTopic(topic); SubscribeToTopicResponseHandler operationResponseHandler = ipcClient .subscribeToTopic(subscribeRequest, Optional.of(new SubscribeResponseHandler())); CompletableFuture<SubscribeToTopicResponse> futureResponse = operationResponseHandler.getResponse(); try { futureResponse.get(10, TimeUnit.SECONDS); System.out.println("Successfully subscribed to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while subscribing to topic: " + topic); throw e; } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while subscribing to topic: " + topic); } else { System.err.println("Execution exception while subscribing to topic: " + topic); } throw e; } // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } } catch (Exception e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } private static class SubscribeResponseHandler implements StreamResponseHandler<SubscriptionResponseMessage> { @Override public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { String message = new String(subscriptionResponseMessage.getBinaryMessage() .getMessage(), StandardCharsets.UTF_8); System.out.println("Received new message: " + message); } catch (Exception e) { e.printStackTrace(); } } @Override public boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } @Override public void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } } }

La siguiente receta de ejemplo permite que el componente se suscriba a todos los temas.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberJava", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberJava:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "java -jar {artifacts:path}/PubSubSubscriber.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubSubscriberJava ComponentVersion: '1.0.0' ComponentDescription: A component that subscribes to messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: 'com.example.PubSubSubscriberJava:pubsub:1': policyDescription: Allows access to subscribe to all topics. operations: - 'aws.greengrass#SubscribeToTopic' resources: - '*' Manifests: - Lifecycle: Run: |- java -jar {artifacts:path}/PubSubSubscriber.jar
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberJava", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberJava:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "java -jar {artifacts:path}/PubSubSubscriber.jar" } } ] }

El siguiente ejemplo de aplicación Java muestra cómo utilizar el servicio IPC de publicación/suscripción para suscribirse a los mensajes de otros componentes.

/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.ipc.pubsub; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicRequest; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicResponse; import software.amazon.awssdk.aws.greengrass.model.SubscriptionResponseMessage; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PubSubSubscriber { public static void main(String[] args) { String topic = "test/topic/java"; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); SubscribeToTopicRequest subscribeRequest = new SubscribeToTopicRequest(); subscribeRequest.setTopic(topic); SubscribeToTopicResponseHandler operationResponseHandler = ipcClient .subscribeToTopic(subscribeRequest, Optional.of(new SubscribeResponseHandler())); CompletableFuture<SubscribeToTopicResponse> futureResponse = operationResponseHandler.getResponse(); try { futureResponse.get(10, TimeUnit.SECONDS); System.out.println("Successfully subscribed to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while subscribing to topic: " + topic); throw e; } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while subscribing to topic: " + topic); } else { System.err.println("Execution exception while subscribing to topic: " + topic); } throw e; } // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } } catch (Exception e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } private static class SubscribeResponseHandler implements StreamResponseHandler<SubscriptionResponseMessage> { @Override public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { String message = new String(subscriptionResponseMessage.getBinaryMessage() .getMessage(), StandardCharsets.UTF_8); System.out.println("Received new message: " + message); } catch (Exception e) { e.printStackTrace(); } } @Override public boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } @Override public void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } } }

La siguiente receta de ejemplo permite que el componente publique en todos los temas.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherPython", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherPython:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "python3 -m pip install --user awsiotsdk", "Run": "python3 -u {artifacts:path}/pubsub_publisher.py" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "install": "py -3 -m pip install --user awsiotsdk", "Run": "py -3 -u {artifacts:path}/pubsub_publisher.py" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubPublisherPython ComponentVersion: 1.0.0 ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubPublisherPython:pubsub:1: policyDescription: Allows access to publish to all topics. operations: - aws.greengrass#PublishToTopic resources: - "*" Manifests: - Platform: os: linux Lifecycle: install: python3 -m pip install --user awsiotsdk Run: python3 -u {artifacts:path}/pubsub_publisher.py - Platform: os: windows Lifecycle: install: py -3 -m pip install --user awsiotsdk Run: py -3 -u {artifacts:path}/pubsub_publisher.py

El siguiente ejemplo de aplicación de Python demuestra cómo utilizar el servicio IPC de publicación/suscripción para publicar mensajes en otros componentes.

import concurrent.futures import sys import time import traceback import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import ( PublishToTopicRequest, PublishMessage, BinaryMessage, UnauthorizedError ) topic = "test/topic/python" message = "Hello from the pub/sub publisher (Python)." TIMEOUT = 10 try: ipc_client = awsiot.greengrasscoreipc.connect() while True: request = PublishToTopicRequest() request.topic = topic publish_message = PublishMessage() publish_message.binary_message = BinaryMessage() publish_message.binary_message.message = bytes(message, "utf-8") request.publish_message = publish_message operation = ipc_client.new_publish_to_topic() operation.activate(request) future_response = operation.get_response() try: future_response.result(TIMEOUT) print('Successfully published to topic: ' + topic) except concurrent.futures.TimeoutError: print('Timeout occurred while publishing to topic: ' + topic, file=sys.stderr) except UnauthorizedError as e: print('Unauthorized error while publishing to topic: ' + topic, file=sys.stderr) raise e except Exception as e: print('Exception while publishing to topic: ' + topic, file=sys.stderr) raise e time.sleep(5) except InterruptedError: print('Publisher interrupted.') except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

La siguiente receta de ejemplo permite que el componente publique en todos los temas.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherPython", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherPython:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "python3 -m pip install --user awsiotsdk", "Run": "python3 -u {artifacts:path}/pubsub_publisher.py" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "install": "py -3 -m pip install --user awsiotsdk", "Run": "py -3 -u {artifacts:path}/pubsub_publisher.py" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubPublisherPython ComponentVersion: 1.0.0 ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubPublisherPython:pubsub:1: policyDescription: Allows access to publish to all topics. operations: - aws.greengrass#PublishToTopic resources: - "*" Manifests: - Platform: os: linux Lifecycle: install: python3 -m pip install --user awsiotsdk Run: python3 -u {artifacts:path}/pubsub_publisher.py - Platform: os: windows Lifecycle: install: py -3 -m pip install --user awsiotsdk Run: py -3 -u {artifacts:path}/pubsub_publisher.py
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherPython", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherPython:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "python3 -m pip install --user awsiotsdk", "Run": "python3 -u {artifacts:path}/pubsub_publisher.py" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "install": "py -3 -m pip install --user awsiotsdk", "Run": "py -3 -u {artifacts:path}/pubsub_publisher.py" } } ] }

El siguiente ejemplo de aplicación de Python demuestra cómo utilizar el servicio IPC de publicación/suscripción para publicar mensajes en otros componentes.

import concurrent.futures import sys import time import traceback import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import ( PublishToTopicRequest, PublishMessage, BinaryMessage, UnauthorizedError ) topic = "test/topic/python" message = "Hello from the pub/sub publisher (Python)." TIMEOUT = 10 try: ipc_client = awsiot.greengrasscoreipc.connect() while True: request = PublishToTopicRequest() request.topic = topic publish_message = PublishMessage() publish_message.binary_message = BinaryMessage() publish_message.binary_message.message = bytes(message, "utf-8") request.publish_message = publish_message operation = ipc_client.new_publish_to_topic() operation.activate(request) future_response = operation.get_response() try: future_response.result(TIMEOUT) print('Successfully published to topic: ' + topic) except concurrent.futures.TimeoutError: print('Timeout occurred while publishing to topic: ' + topic, file=sys.stderr) except UnauthorizedError as e: print('Unauthorized error while publishing to topic: ' + topic, file=sys.stderr) raise e except Exception as e: print('Exception while publishing to topic: ' + topic, file=sys.stderr) raise e time.sleep(5) except InterruptedError: print('Publisher interrupted.') except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

La siguiente receta de ejemplo permite que el componente se suscriba a todos los temas.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberPython", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberPython:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "python3 -m pip install --user awsiotsdk", "Run": "python3 -u {artifacts:path}/pubsub_subscriber.py" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "install": "py -3 -m pip install --user awsiotsdk", "Run": "py -3 -u {artifacts:path}/pubsub_subscriber.py" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubSubscriberPython ComponentVersion: 1.0.0 ComponentDescription: A component that subscribes to messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubSubscriberPython:pubsub:1: policyDescription: Allows access to subscribe to all topics. operations: - aws.greengrass#SubscribeToTopic resources: - "*" Manifests: - Platform: os: linux Lifecycle: install: python3 -m pip install --user awsiotsdk Run: python3 -u {artifacts:path}/pubsub_subscriber.py - Platform: os: windows Lifecycle: install: py -3 -m pip install --user awsiotsdk Run: py -3 -u {artifacts:path}/pubsub_subscriber.py

El siguiente ejemplo de aplicación de Python demuestra cómo utilizar el servicio IPC de publicación/suscripción para suscribirse a los mensajes de otros componentes.

import concurrent.futures import sys import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( SubscribeToTopicRequest, SubscriptionResponseMessage, UnauthorizedError ) topic = "test/topic/python" TIMEOUT = 10 class StreamHandler(client.SubscribeToTopicStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, "utf-8") print("Received new message: " + message) except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: print("Received a stream error.", file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: print('Subscribe to topic stream closed.') try: ipc_client = awsiot.greengrasscoreipc.connect() request = SubscribeToTopicRequest() request.topic = topic handler = StreamHandler() operation = ipc_client.new_subscribe_to_topic(handler) operation.activate(request) future_response = operation.get_response() try: future_response.result(TIMEOUT) print('Successfully subscribed to topic: ' + topic) except concurrent.futures.TimeoutError as e: print('Timeout occurred while subscribing to topic: ' + topic, file=sys.stderr) raise e except UnauthorizedError as e: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) raise e except Exception as e: print('Exception while subscribing to topic: ' + topic, file=sys.stderr) raise e # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

La siguiente receta de ejemplo permite que el componente se suscriba a todos los temas.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberPython", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberPython:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "python3 -m pip install --user awsiotsdk", "Run": "python3 -u {artifacts:path}/pubsub_subscriber.py" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "install": "py -3 -m pip install --user awsiotsdk", "Run": "py -3 -u {artifacts:path}/pubsub_subscriber.py" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubSubscriberPython ComponentVersion: 1.0.0 ComponentDescription: A component that subscribes to messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubSubscriberPython:pubsub:1: policyDescription: Allows access to subscribe to all topics. operations: - aws.greengrass#SubscribeToTopic resources: - "*" Manifests: - Platform: os: linux Lifecycle: install: python3 -m pip install --user awsiotsdk Run: python3 -u {artifacts:path}/pubsub_subscriber.py - Platform: os: windows Lifecycle: install: py -3 -m pip install --user awsiotsdk Run: py -3 -u {artifacts:path}/pubsub_subscriber.py
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberPython", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberPython:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "python3 -m pip install --user awsiotsdk", "Run": "python3 -u {artifacts:path}/pubsub_subscriber.py" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "install": "py -3 -m pip install --user awsiotsdk", "Run": "py -3 -u {artifacts:path}/pubsub_subscriber.py" } } ] }

El siguiente ejemplo de aplicación de Python demuestra cómo utilizar el servicio IPC de publicación/suscripción para suscribirse a los mensajes de otros componentes.

import concurrent.futures import sys import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( SubscribeToTopicRequest, SubscriptionResponseMessage, UnauthorizedError ) topic = "test/topic/python" TIMEOUT = 10 class StreamHandler(client.SubscribeToTopicStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, "utf-8") print("Received new message: " + message) except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: print("Received a stream error.", file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: print('Subscribe to topic stream closed.') try: ipc_client = awsiot.greengrasscoreipc.connect() request = SubscribeToTopicRequest() request.topic = topic handler = StreamHandler() operation = ipc_client.new_subscribe_to_topic(handler) operation.activate(request) future_response = operation.get_response() try: future_response.result(TIMEOUT) print('Successfully subscribed to topic: ' + topic) except concurrent.futures.TimeoutError as e: print('Timeout occurred while subscribing to topic: ' + topic, file=sys.stderr) raise e except UnauthorizedError as e: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) raise e except Exception as e: print('Exception while subscribing to topic: ' + topic, file=sys.stderr) raise e # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

La siguiente receta de ejemplo permite que el componente publique en todos los temas.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherCpp:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_pubsub_publisher" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubPublisherCpp/1.0.0/greengrassv2_pubsub_publisher", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubPublisherCpp ComponentVersion: 1.0.0 ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubPublisherCpp:pubsub:1: policyDescription: Allows access to publish to all topics. operations: - aws.greengrass#PublishToTopic resources: - "*" Manifests: - Lifecycle: Run: "{artifacts:path}/greengrassv2_pubsub_publisher" Artifacts: - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubPublisherCpp/1.0.0/greengrassv2_pubsub_publisher Permission: Execute: OWNER

El siguiente ejemplo de aplicación de C++ demuestra cómo utilizar el servicio IPC de publicación/suscripción para publicar mensajes en otros componentes.

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String message("Hello from the pub/sub publisher (C++)."); String topic("test/topic/cpp"); int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } while (true) { PublishToTopicRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); BinaryMessage binaryMessage; binaryMessage.SetMessage(messageData); PublishMessage publishMessage; publishMessage.SetBinaryMessage(binaryMessage); request.SetTopic(topic); request.SetPublishMessage(publishMessage); auto operation = ipcClient.NewPublishToTopic(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } std::this_thread::sleep_for(std::chrono::seconds(5)); } return 0; }

La siguiente receta de ejemplo permite que el componente publique en todos los temas.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherCpp:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_pubsub_publisher" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubPublisherCpp/1.0.0/greengrassv2_pubsub_publisher", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubPublisherCpp ComponentVersion: 1.0.0 ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubPublisherCpp:pubsub:1: policyDescription: Allows access to publish to all topics. operations: - aws.greengrass#PublishToTopic resources: - "*" Manifests: - Lifecycle: Run: "{artifacts:path}/greengrassv2_pubsub_publisher" Artifacts: - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubPublisherCpp/1.0.0/greengrassv2_pubsub_publisher Permission: Execute: OWNER
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherCpp:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_pubsub_publisher" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubPublisherCpp/1.0.0/greengrassv2_pubsub_publisher", "Permission": { "Execute": "OWNER" } } ] } ] }

El siguiente ejemplo de aplicación de C++ demuestra cómo utilizar el servicio IPC de publicación/suscripción para publicar mensajes en otros componentes.

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String message("Hello from the pub/sub publisher (C++)."); String topic("test/topic/cpp"); int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } while (true) { PublishToTopicRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); BinaryMessage binaryMessage; binaryMessage.SetMessage(messageData); PublishMessage publishMessage; publishMessage.SetBinaryMessage(binaryMessage); request.SetTopic(topic); request.SetPublishMessage(publishMessage); auto operation = ipcClient.NewPublishToTopic(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } std::this_thread::sleep_for(std::chrono::seconds(5)); } return 0; }

La siguiente receta de ejemplo permite que el componente se suscriba a todos los temas.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberCpp:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_pub_sub_subscriber" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pub_sub_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubSubscriberCpp ComponentVersion: 1.0.0 ComponentDescription: A component that subscribes to messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubSubscriberCpp:pubsub:1: policyDescription: Allows access to subscribe to all topics. operations: - aws.greengrass#SubscribeToTopic resources: - "*" Manifests: - Lifecycle: Run: "{artifacts:path}/greengrassv2_pub_sub_subscriber" Artifacts: - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pub_sub_subscriber Permission: Execute: OWNER

El siguiente ejemplo de aplicación de C++ demuestra cómo utilizar el servicio IPC de publicación/suscripción para suscribirse a los mensajes de otros componentes.

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); std::cout << "Received new message: " << messageString << std::endl; } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::cout << "Received new message: " << messageString << std::endl; } } } bool OnStreamError(OperationError *error) override { std::cout << "Received an operation error: "; if (error->GetMessage().has_value()) { std::cout << error->GetMessage().value(); } std::cout << std::endl; return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { std::cout << "Subscribe to topic stream closed." << std::endl; } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String topic("test/topic/cpp"); int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } SubscribeToTopicRequest request; request.SetTopic(topic); auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully subscribed to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to subscribe to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }

La siguiente receta de ejemplo permite que el componente se suscriba a todos los temas.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberCpp:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_pub_sub_subscriber" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pub_sub_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubSubscriberCpp ComponentVersion: 1.0.0 ComponentDescription: A component that subscribes to messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubSubscriberCpp:pubsub:1: policyDescription: Allows access to subscribe to all topics. operations: - aws.greengrass#SubscribeToTopic resources: - "*" Manifests: - Lifecycle: Run: "{artifacts:path}/greengrassv2_pub_sub_subscriber" Artifacts: - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pub_sub_subscriber Permission: Execute: OWNER
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberCpp:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_pub_sub_subscriber" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pub_sub_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }

El siguiente ejemplo de aplicación de C++ demuestra cómo utilizar el servicio IPC de publicación/suscripción para suscribirse a los mensajes de otros componentes.

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); std::cout << "Received new message: " << messageString << std::endl; } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::cout << "Received new message: " << messageString << std::endl; } } } bool OnStreamError(OperationError *error) override { std::cout << "Received an operation error: "; if (error->GetMessage().has_value()) { std::cout << error->GetMessage().value(); } std::cout << std::endl; return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { std::cout << "Subscribe to topic stream closed." << std::endl; } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String topic("test/topic/cpp"); int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } SubscribeToTopicRequest request; request.SetTopic(topic); auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully subscribed to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to subscribe to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
PrivacidadTérminos del sitioPreferencias de cookies
© 2025, Amazon Web Services, Inc o sus afiliados. Todos los derechos reservados.