Publicación/suscripción de mensajes MQTT AWS IoT Core - AWS IoT Greengrass

Publicación/suscripción de mensajes MQTT AWS IoT Core

El servicio de IPC de mensajería MQTT AWS IoT Core le permite enviar y recibir mensajes MQTT desde y hacia AWS IoT Core. Los componentes pueden publicar mensajes en los temas AWS IoT Core y suscribirse a ellos para actuar en función de los mensajes MQTT de otros orígenes. Para obtener más información sobre la implementación de AWS IoT Core de MQTT, consulte MQTT en la Guía para desarrolladores de AWS IoT Core.

nota

Este servicio de IPC de mensajería MQTT le permite intercambiar mensajes con AWS IoT Core. Para obtener más información sobre cómo intercambiar mensajes entre componentes, consulte Publicar/suscribir mensajes locales.

Versiones mínimas de SDK

En la siguiente tabla se enumeran las versiones mínimas del SDK para dispositivos con AWS IoT que debe utilizar para publicar y suscribirse a mensajes de MQTT hacia y desde AWS IoT Core.

Autorización

Para utilizar la mensajería MQTT AWS IoT Core en un componente personalizado, debe definir políticas de autorización que permitan a su componente enviar y recibir mensajes sobre temas. Para obtener información sobre cómo definir las políticas de autorización, consulte Autorización de los componentes para realizar operaciones de IPC.

Las políticas de autorización para la mensajería MQTT AWS IoT Core tienen las siguientes propiedades.

Identificador de servicio IPC: aws.greengrass.ipc.mqttproxy

Operación Descripción Recursos

aws.greengrass#PublishToIoTCore

Permite que un componente publique mensajes a AWS IoT Core en los temas de MQTT que especifique.

Una cadena de temas, por ejemplo, test/topic, o * para permitir el acceso a todos los temas. Puede utilizar los caracteres comodín (# y +) de los temas de MQTT para hacer coincidir varios recursos.

aws.greengrass#SubscribeToIoTCore

Permite que un componente se suscriba a los mensajes en AWS IoT Core de los temas que especifique.

Una cadena de temas, por ejemplo, test/topic, o * para permitir el acceso a todos los temas. Puede utilizar los caracteres comodín (# y +) de los temas de MQTT para hacer coincidir varios recursos.

*

Permite que un componente publique y se suscriba a los mensajes de MQTT AWS IoT Core para los temas que especifique.

Una cadena de temas, por ejemplo, test/topic, o * para permitir el acceso a todos los temas. Puede utilizar los caracteres comodín (# y +) de los temas de MQTT para hacer coincidir varios recursos.

Los caracteres comodín de MQTT en las políticas de autorización de MQTT AWS IoT Core

Puede utilizar caracteres comodín de MQTT en las políticas de autorización de IPC de MQTT AWS IoT Core. Los componentes pueden publicar y suscribirse a temas que coincidan con el filtro de temas que usted permita en una política de autorización. Por ejemplo, si la política de autorización de un componente concede acceso a test/topic/#, el componente puede suscribirse a test/topic/#, publicar y suscribirse a test/topic/filter.

Variables de receta en las políticas de autorización de AWS IoT Core MQTT

Si usa la versión 2.6.0 o posterior del núcleo de Greengrass, puede usar la variable de receta {iot:thingName} en las políticas de autorización. Esta característica le permite configurar una política de autorización única para un grupo de dispositivos principales, de forma que cada dispositivo principal solo pueda acceder a los temas que contengan su propio nombre. Por ejemplo, puede permitir que un componente acceda al siguiente recurso de tema.

devices/{iot:thingName}/messages

Para obtener más información, consulte Variables de receta y Uso de variables de receta en las actualizaciones de combinación.

Ejemplos de políticas de autorización

Puede consultar los siguientes ejemplos de políticas de autorización con el fin de configurar las políticas de autorización para sus componentes.

ejemplo Ejemplo de política de autorización con acceso ilimitado

El siguiente ejemplo de política de autorización permite a un componente publicar y suscribirse a todos los temas.

JSON
{ "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:mqttproxy:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "*" ] } } } }
YAML
--- accessControl: aws.greengrass.ipc.mqttproxy: com.example.MyIoTCorePubSubComponent:mqttproxy:1: policyDescription: Allows access to publish/subscribe to all topics. operations: - aws.greengrass#PublishToIoTCore - aws.greengrass#SubscribeToIoTCore resources: - "*"
ejemplo Ejemplo de política de autorización con acceso limitado

El siguiente ejemplo de política de autorización permite a un componente publicar y suscribirse a dos temas denominados factory/1/events y factory/1/actions.

JSON
{ "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:mqttproxy:1": { "policyDescription": "Allows access to publish/subscribe to factory 1 topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "factory/1/actions", "factory/1/events" ] } } } }
YAML
--- accessControl: aws.greengrass.ipc.mqttproxy: "com.example.MyIoTCorePubSubComponent:mqttproxy:1": policyDescription: Allows access to publish/subscribe to factory 1 topics. operations: - aws.greengrass#PublishToIoTCore - aws.greengrass#SubscribeToIoTCore resources: - factory/1/actions - factory/1/events
ejemplo Ejemplo de política de autorización para un grupo de dispositivos principales
importante

En este ejemplo, se utiliza una característica que está disponible para la versión 2.6.0 y versiones posteriores del componente núcleo de Greengrass. El núcleo de Greengrass versión 2.6.0 suma compatibilidad con la mayoría de las variables de receta, por ejemplo: {iot:thingName}, en las configuraciones de componentes.

El siguiente ejemplo de política de autorización permite a un componente publicar y suscribirse a un tema que contenga el nombre del dispositivo principal que ejecuta el componente.

JSON
{ "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:mqttproxy:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "factory/1/devices/{iot:thingName}/controls" ] } } } }
YAML
--- accessControl: aws.greengrass.ipc.mqttproxy: "com.example.MyIoTCorePubSubComponent:mqttproxy:1": policyDescription: Allows access to publish/subscribe to all topics. operations: - aws.greengrass#PublishToIoTCore - aws.greengrass#SubscribeToIoTCore resources: - factory/1/devices/{iot:thingName}/controls

PublishToIoTCore

Publica un mensaje MQTT para AWS IoT Core en un tema.

Al publicar mensajes MQTT en AWS IoT Core, hay una cuota de 100 transacciones por segundo. Si supera esta cuota, los mensajes se ponen en cola para su procesamiento en el dispositivo de Greengrass. También hay una cuota de 512 KB de datos por segundo y una cuota de 20 000 publicaciones por segundo en toda la cuenta (2000 en algunas Regiones de AWS). Para obtener más información sobre el límite del agente de mensajes MQTT en AWS IoT Core, consulte Límites de protocolo y cuotas del agente de mensajes de AWS IoT Core.

Si supera estas cuotas, el dispositivo de Greengrass limita la publicación de mensajes en AWS IoT Core. Los mensajes se almacenan en un spooler de memoria. De forma predeterminada, la memoria asignada al spooler es de 2,5 Mb. Si el spooler se llena, se rechazan los mensajes nuevos. Puede aumentar el tamaño del spooler. Para obtener más información, consulte Configuración en la documentación del Núcleo de Greengrass. Para evitar llenar el espacio y tener que aumentar la memoria asignada, limite las solicitudes de publicación a no más de 100 solicitudes por segundo.

Si su aplicación necesita enviar mensajes a una velocidad mayor o mensajes más grandes, considere utilizar el Administrador de transmisiones para enviar mensajes a Kinesis Data Streams. El componente administrador de flujos está diseñado para transferir grandes volúmenes de datos a la Nube de AWS. Para obtener más información, consulte Administración de flujos de datos en los dispositivos principales de Greengrass.

Solicitud

Esta solicitud de operación tiene los siguientes parámetros:

topicName (Python: topic_name)

El tema en el que se va a publicar el mensaje.

qos

La QoS de MQTT que se va a utilizar. Esta enumeración, QOS, tiene los siguientes valores:

  • AT_MOST_ONCE: QoS 0. El mensaje MQTT se entrega como máximo una vez.

  • AT_LEAST_ONCE: QoS 1. El mensaje MQTT se entrega como máximo una vez.

payload

(Opcional) El mensaje se carga como un blob.

Las siguientes características están disponibles para la versión 2.10.0 y versiones posteriores del Núcleo de Greengrass cuando se utiliza MQTT 5. Estas características no se tienen en cuenta al utilizar MQTT 3.1.1. En la siguiente tabla, se enumeran las versiones mínimas del SDK del dispositivo AWS IoT que debe utilizar para acceder a estas características.

SDK Versión mínima
AWS IoT Device SDK para Python v2 Versión 1.15.0
AWS IoT Device SDK para Java v2 Versión 1.13.0
AWS IoT Device SDK para C++ v2 Versión 1.24.0
SDK de AWS IoT Device para JavaScript v2 Versión 1.13.0
payloadFormat

(Opcional) El formato de la carga útil del mensaje. Si no establece el payloadFormat, se supone que el tipo es BYTES. Esta enumeración tiene los siguientes valores:

  • BYTES: el contenido de la carga útil es un blob binario.

  • UTF8: el contenido de la carga útil es una cadena de caracteres UTF8.

retain

(Opcional) Indica si se debe configurar la opción de retención de MQTT en true durante la publicación.

userProperties

(Opcional) Una lista de objetos UserProperty específicos de la aplicación para enviar. El objeto UserProperty se define de la siguiente manera:

UserProperty: key: string value: string
messageExpiryIntervalSeconds

(Opcional) La cantidad de segundos antes de que el mensaje venza y el servidor lo elimine. Si no se establece este valor, el mensaje no vence.

correlationData

(Opcional) Información agregada a la solicitud que se puede usar para asociar una solicitud a una respuesta.

responseTopic

(Opcional) El tema que debe usarse para el mensaje de respuesta.

contentType

(Opcional) Un identificador específico de la aplicación del tipo de contenido del mensaje.

Respuesta

Esta operación no proporciona ninguna información en su respuesta.

Ejemplos

En los ejemplos siguientes, se muestra cómo llamar a esta operación en código de componente personalizado.

Java (IPC client V2)
ejemplo Ejemplo: publicar un mensaje
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest; import software.amazon.awssdk.aws.greengrass.model.QOS; import java.nio.charset.StandardCharsets; public class PublishToIoTCore { public static void main(String[] args) { String topic = args[0]; String message = args[1]; QOS qos = QOS.get(args[2]); try (GreengrassCoreIPCClientV2 ipcClientV2 = GreengrassCoreIPCClientV2.builder().build()) { ipcClientV2.publishToIoTCore(new PublishToIoTCoreRequest() .withTopicName(topic) .withPayload(message.getBytes(StandardCharsets.UTF_8)) .withQos(qos)); System.out.println("Successfully published to topic: " + topic); } catch (Exception e) { System.err.println("Exception occurred."); e.printStackTrace(); System.exit(1); } } }
Python (IPC client V2)
ejemplo Ejemplo: publicar un mensaje
nota

En este ejemplo se supone que está utilizando la versión 1.5.4 o posterior del SDK para dispositivos con AWS IoT para Python versión 2.

import awsiot.greengrasscoreipc.clientv2 as clientV2 topic = 'my/topic' qos = '1' payload = 'Hello, World' ipc_client = clientV2.GreengrassCoreIPCClientV2() resp = ipc_client.publish_to_iot_core(topic_name=topic, qos=qos, payload=payload) ipc_client.close()
Java (IPC client V1)
ejemplo Ejemplo: publicar un mensaje
nota

En este ejemplo, se utiliza una clase IPCUtils para crear una conexión con el servicio AWS IoT Greengrass Core IPC. Para obtener más información, consulte Conexión al servicio de IPC de AWS IoT Greengrass Core.

package com.aws.greengrass.docs.samples.ipc; import com.aws.greengrass.docs.samples.ipc.util.IPCUtils; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.PublishToIoTCoreResponseHandler; import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest; import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreResponse; import software.amazon.awssdk.aws.greengrass.model.QOS; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PublishToIoTCore { public static final int TIMEOUT_SECONDS = 10; public static void main(String[] args) { String topic = args[0]; String message = args[1]; QOS qos = QOS.get(args[2]); try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); PublishToIoTCoreResponseHandler responseHandler = PublishToIoTCore.publishBinaryMessageToTopic(ipcClient, topic, message, qos); CompletableFuture<PublishToIoTCoreResponse> futureResponse = responseHandler.getResponse(); try { futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); System.out.println("Successfully published to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while publishing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { throw e; } } } catch (InterruptedException e) { System.out.println("IPC interrupted."); } catch (ExecutionException e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } public static PublishToIoTCoreResponseHandler publishBinaryMessageToTopic(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, String message, QOS qos) { PublishToIoTCoreRequest publishToIoTCoreRequest = new PublishToIoTCoreRequest(); publishToIoTCoreRequest.setTopicName(topic); publishToIoTCoreRequest.setPayload(message.getBytes(StandardCharsets.UTF_8)); publishToIoTCoreRequest.setQos(qos); return greengrassCoreIPCClient.publishToIoTCore(publishToIoTCoreRequest, Optional.empty()); } }
Python (IPC client V1)
ejemplo Ejemplo: publicar un mensaje
nota

En este ejemplo se supone que está utilizando la versión 1.5.4 o posterior del SDK para dispositivos con AWS IoT para Python versión 2.

import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( QOS, PublishToIoTCoreRequest ) TIMEOUT = 10 ipc_client = awsiot.greengrasscoreipc.connect() topic = "my/topic" message = "Hello, World" qos = QOS.AT_LEAST_ONCE request = PublishToIoTCoreRequest() request.topic_name = topic request.payload = bytes(message, "utf-8") request.qos = qos operation = ipc_client.new_publish_to_iot_core() operation.activate(request) future_response = operation.get_response() future_response.result(TIMEOUT)
C++
ejemplo Ejemplo: publicar un mensaje
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String message("Hello, World!"); String topic("my/topic"); QOS qos = QOS_AT_MOST_ONCE; int timeout = 10; PublishToIoTCoreRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); request.SetTopicName(topic); request.SetPayload(messageData); request.SetQos(qos); auto operation = ipcClient.NewPublishToIoTCore(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } } return 0; }
JavaScript
ejemplo Ejemplo: publicar un mensaje
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {QOS, PublishToIoTCoreRequest} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; class PublishToIoTCore { private ipcClient: greengrasscoreipc.Client private readonly topic: string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.publishToIoTCore().then(r => console.log("Started workflow")); } private async publishToIoTCore() { try { const request: PublishToIoTCoreRequest = { topicName: this.topic, qos: QOS.AT_LEAST_ONCE, // you can change this depending on your use case } this.ipcClient = await getIpcClient(); await this.ipcClient.publishToIoTCore(request); } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const publishToIoTCore = new PublishToIoTCore();

SubscribeToIoTCore

Suscríbase a los mensajes de MQTT de AWS IoT Core en un tema o filtro de temas. El software AWS IoT Greengrass Core elimina las suscripciones cuando el componente llega al final de su ciclo de vida.

Esta es una operación de suscripción en la que se suscribe a un flujo de mensajes de eventos. Para usar esta operación, defina un identificador de respuesta de flujo con funciones que gestionen los mensajes de eventos, los errores y el cierre del flujo. Para obtener más información, consulte Suscripción a los flujos de eventos de IPC.

Tipo de mensaje del evento: IoTCoreMessage

Solicitud

Esta solicitud de operación tiene los siguientes parámetros:

topicName (Python: topic_name)

El tema al que se suscribe. Puede utilizar los comodines (# y +) de los temas de MQTT para suscribirse a varios temas.

qos

La QoS de MQTT que se va a utilizar. Esta enumeración, QOS, tiene los siguientes valores:

  • AT_MOST_ONCE: QoS 0. El mensaje MQTT se entrega como máximo una vez.

  • AT_LEAST_ONCE: QoS 1. El mensaje MQTT se entrega como máximo una vez.

Respuesta

Esta respuesta de operación contiene la siguiente información:

messages

El flujo de mensajes MQTT. Este objeto, IoTCoreMessage, contiene la siguiente información:

message

El mensaje MQTT. Este objeto, MQTTMessage, contiene la siguiente información:

topicName (Python: topic_name)

El tema en el que se publicó el mensaje.

payload

(Opcional) El mensaje se carga como un blob.

Las siguientes características están disponibles para la versión 2.10.0 y versiones posteriores del Núcleo de Greengrass cuando se utiliza MQTT 5. Estas características no se tienen en cuenta al utilizar MQTT 3.1.1. En la siguiente tabla, se enumeran las versiones mínimas del SDK del dispositivo AWS IoT que debe utilizar para acceder a estas características.

SDK Versión mínima
AWS IoT Device SDK para Python v2 Versión 1.15.0
AWS IoT Device SDK para Java v2 Versión 1.13.0
AWS IoT Device SDK para C++ v2 Versión 1.24.0
SDK de AWS IoT Device para JavaScript v2 Versión 1.13.0
payloadFormat

(Opcional) El formato de la carga útil del mensaje. Si no establece el payloadFormat, se supone que el tipo es BYTES. Esta enumeración tiene los siguientes valores:

  • BYTES: el contenido de la carga útil es un blob binario.

  • UTF8: el contenido de la carga útil es una cadena de caracteres UTF8.

retain

(Opcional) Indica si se debe configurar la opción de retención de MQTT en true durante la publicación.

userProperties

(Opcional) Una lista de objetos UserProperty específicos de la aplicación para enviar. El objeto UserProperty se define de la siguiente manera:

UserProperty: key: string value: string
messageExpiryIntervalSeconds

(Opcional) La cantidad de segundos antes de que el mensaje venza y el servidor lo elimine. Si no se establece este valor, el mensaje no vence.

correlationData

(Opcional) Información agregada a la solicitud que se puede usar para asociar una solicitud a una respuesta.

responseTopic

(Opcional) El tema que debe usarse para el mensaje de respuesta.

contentType

(Opcional) Un identificador específico de la aplicación del tipo de contenido del mensaje.

Ejemplos

En los ejemplos siguientes, se muestra cómo llamar a esta operación en código de componente personalizado.

Java (IPC client V2)
ejemplo Ejemplo: suscribirse a los mensajes
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToIoTCoreResponseHandler; import software.amazon.awssdk.aws.greengrass.model.QOS; import software.amazon.awssdk.aws.greengrass.model.IoTCoreMessage; import software.amazon.awssdk.aws.greengrass.model.SubscribeToIoTCoreRequest; import software.amazon.awssdk.aws.greengrass.model.SubscribeToIoTCoreResponse; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; public class SubscribeToIoTCore { public static void main(String[] args) { String topic = args[0]; QOS qos = QOS.get(args[1]); Consumer<IoTCoreMessage> onStreamEvent = ioTCoreMessage -> System.out.printf("Received new message on topic %s: %s%n", ioTCoreMessage.getMessage().getTopicName(), new String(ioTCoreMessage.getMessage().getPayload(), StandardCharsets.UTF_8)); Optional<Function<Throwable, Boolean>> onStreamError = Optional.of(e -> { System.err.println("Received a stream error."); e.printStackTrace(); return false; }); Optional<Runnable> onStreamClosed = Optional.of(() -> System.out.println("Subscribe to IoT Core stream closed.")); try (GreengrassCoreIPCClientV2 ipcClientV2 = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToIoTCoreRequest request = new SubscribeToIoTCoreRequest() .withTopicName(topic) .withQos(qos); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToIoTCoreResponse, SubscribeToIoTCoreResponseHandler> streamingResponse = ipcClientV2.subscribeToIoTCore(request, onStreamEvent, onStreamError, onStreamClosed); streamingResponse.getResponse(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. while (true) { Thread.sleep(10000); } // To stop subscribing, close the stream. streamingResponse.getHandler().closeStream(); } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } catch (Exception e) { System.err.println("Exception occurred."); e.printStackTrace(); System.exit(1); } } }
Python (IPC client V2)
ejemplo Ejemplo: suscribirse a los mensajes
nota

En este ejemplo se supone que está utilizando la versión 1.5.4 o posterior del SDK para dispositivos con AWS IoT para Python versión 2.

import threading import traceback import awsiot.greengrasscoreipc.clientv2 as clientV2 topic = 'my/topic' qos = '1' def on_stream_event(event): try: topic_name = event.message.topic_name message = str(event.message.payload, 'utf-8') print(f'Received new message on topic {topic_name}: {message}') except: traceback.print_exc() def on_stream_error(error): # Return True to close stream, False to keep stream open. return True def on_stream_closed(): pass ipc_client = clientV2.GreengrassCoreIPCClientV2() resp, operation = ipc_client.subscribe_to_iot_core( topic_name=topic, qos=qos, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed ) # Keep the main thread alive, or the process will exit. event = threading.Event() event.wait() # To stop subscribing, close the operation stream. operation.close() ipc_client.close()
Java (IPC client V1)
ejemplo Ejemplo: suscribirse a los mensajes
nota

En este ejemplo, se utiliza una clase IPCUtils para crear una conexión con el servicio AWS IoT Greengrass Core IPC. Para obtener más información, consulte Conexión al servicio de IPC de AWS IoT Greengrass Core.

package com.aws.greengrass.docs.samples.ipc; import com.aws.greengrass.docs.samples.ipc.util.IPCUtils; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.SubscribeToIoTCoreResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class SubscribeToIoTCore { public static final int TIMEOUT_SECONDS = 10; public static void main(String[] args) { String topic = args[0]; QOS qos = QOS.get(args[1]); try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); StreamResponseHandler<IoTCoreMessage> streamResponseHandler = new SubscriptionResponseHandler(); SubscribeToIoTCoreResponseHandler responseHandler = SubscribeToIoTCore.subscribeToIoTCore(ipcClient, topic, qos, streamResponseHandler); CompletableFuture<SubscribeToIoTCoreResponse> futureResponse = responseHandler.getResponse(); try { futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); System.out.println("Successfully subscribed to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while subscribing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while subscribing to topic: " + topic); } else { throw e; } } // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (InterruptedException e) { System.out.println("IPC interrupted."); } catch (ExecutionException e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } public static SubscribeToIoTCoreResponseHandler subscribeToIoTCore(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, QOS qos, StreamResponseHandler<IoTCoreMessage> streamResponseHandler) { SubscribeToIoTCoreRequest subscribeToIoTCoreRequest = new SubscribeToIoTCoreRequest(); subscribeToIoTCoreRequest.setTopicName(topic); subscribeToIoTCoreRequest.setQos(qos); return greengrassCoreIPCClient.subscribeToIoTCore(subscribeToIoTCoreRequest, Optional.of(streamResponseHandler)); } public static class SubscriptionResponseHandler implements StreamResponseHandler<IoTCoreMessage> { @Override public void onStreamEvent(IoTCoreMessage ioTCoreMessage) { try { String topic = ioTCoreMessage.getMessage().getTopicName(); String message = new String(ioTCoreMessage.getMessage().getPayload(), StandardCharsets.UTF_8); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } @Override public boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; } @Override public void onStreamClosed() { System.out.println("Subscribe to IoT Core stream closed."); } } }
Python (IPC client V1)
ejemplo Ejemplo: suscribirse a los mensajes
nota

En este ejemplo se supone que está utilizando la versión 1.5.4 o posterior del SDK para dispositivos con AWS IoT para Python versión 2.

import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( IoTCoreMessage, QOS, SubscribeToIoTCoreRequest ) TIMEOUT = 10 ipc_client = awsiot.greengrasscoreipc.connect() class StreamHandler(client.SubscribeToIoTCoreStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: IoTCoreMessage) -> None: try: message = str(event.message.payload, "utf-8") topic_name = event.message.topic_name # Handle message. except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: # Handle error. return True # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: # Handle close. pass topic = "my/topic" qos = QOS.AT_MOST_ONCE request = SubscribeToIoTCoreRequest() request.topic_name = topic request.qos = qos handler = StreamHandler() operation = ipc_client.new_subscribe_to_iot_core(handler) operation.activate(request) future_response = operation.get_response() future_response.result(TIMEOUT) # Keep the main thread alive, or the process will exit. while True: time.sleep(10) # To stop subscribing, close the operation stream. operation.close()
C++
ejemplo Ejemplo: suscribirse a los mensajes
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler { public: virtual ~IoTCoreResponseHandler() {} private: void OnStreamEvent(IoTCoreMessage *response) override { auto message = response->GetMessage(); if (message.has_value() && message.value().GetPayload().has_value()) { auto messageBytes = message.value().GetPayload().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::string topicName = message.value().GetTopicName().value().c_str(); // Handle message. } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); QOS qos = QOS_AT_MOST_ONCE; int timeout = 10; SubscribeToIoTCoreRequest request; request.SetTopicName(topic); request.SetQos(qos); auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
JavaScript
ejemplo Ejemplo: suscribirse a los mensajes
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {IoTCoreMessage, QOS, SubscribeToIoTCoreRequest} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc"; class SubscribeToIoTCore { private ipcClient: greengrasscoreipc.Client private readonly topic: string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.subscribeToIoTCore().then(r => console.log("Started workflow")); } private async subscribeToIoTCore() { try { const request: SubscribeToIoTCoreRequest = { topicName: this.topic, qos: QOS.AT_LEAST_ONCE, // you can change this depending on your use case } this.ipcClient = await getIpcClient(); const streamingOperation = this.ipcClient.subscribeToIoTCore(request); streamingOperation.on('message', (message: IoTCoreMessage) => { // parse the message depending on your use cases, e.g. if (message.message && message.message.payload) { const receivedMessage = message.message.payload.toString(); } }); streamingOperation.on('streamError', (error : RpcError) => { // define your own error handling logic }); streamingOperation.on('ended', () => { // define your own logic }); await streamingOperation.activate(); // Keep the main thread alive, or the process will exit. await new Promise((resolve) => setTimeout(resolve, 10000)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const subscribeToIoTCore = new SubscribeToIoTCore();

Ejemplos

Utilice los siguientes ejemplos para aprender a utilizar el servicio IPC de AWS IoT Core MQTT en sus componentes.

La siguiente receta de ejemplo permite que el componente publique en todos los temas.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCorePublisherCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes MQTT messages to IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCorePublisherCpp:mqttproxy:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "run": "{artifacts:path}/greengrassv2_iotcore_publisher" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCorePublisherCpp/1.0.0/greengrassv2_iotcore_publisher", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.IoTCorePublisherCpp ComponentVersion: 1.0.0 ComponentDescription: A component that publishes MQTT messages to IoT Core. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.mqttproxy: com.example.IoTCorePublisherCpp:mqttproxy:1: policyDescription: Allows access to publish to all topics. operations: - aws.greengrass#PublishToIoTCore resources: - "*" Manifests: - Lifecycle: run: "{artifacts:path}/greengrassv2_iotcore_publisher" Artifacts: - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCorePublisherCpp/1.0.0/greengrassv2_iotcore_publisher Permission: Execute: OWNER

El siguiente ejemplo de aplicación de C++ demuestra cómo utilizar el servicio IPC de AWS IoT Core MQTT para publicar mensajes en AWS IoT Core.

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String message("Hello from the Greengrass IPC MQTT publisher (C++)."); String topic("test/topic/cpp"); QOS qos = QOS_AT_LEAST_ONCE; int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } while (true) { PublishToIoTCoreRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); request.SetTopicName(topic); request.SetPayload(messageData); request.SetQos(qos); auto operation = ipcClient.NewPublishToIoTCore(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } std::this_thread::sleep_for(std::chrono::seconds(5)); } return 0; }

La siguiente receta de ejemplo permite que el componente se suscriba a todos los temas.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCoreSubscriberCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to MQTT messages from IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCoreSubscriberCpp:mqttproxy:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "run": "{artifacts:path}/greengrassv2_iotcore_subscriber" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/greengrassv2_iotcore_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.IoTCoreSubscriberCpp ComponentVersion: 1.0.0 ComponentDescription: A component that subscribes to MQTT messages from IoT Core. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.mqttproxy: com.example.IoTCoreSubscriberCpp:mqttproxy:1: policyDescription: Allows access to subscribe to all topics. operations: - aws.greengrass#SubscribeToIoTCore resources: - "*" Manifests: - Lifecycle: run: "{artifacts:path}/greengrassv2_iotcore_subscriber" Artifacts: - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/greengrassv2_iotcore_subscriber Permission: Execute: OWNER

El siguiente ejemplo de aplicación de C++ demuestra cómo utilizar el servicio IPC de AWS IoT Core MQTT para suscribirse a los mensajes de AWS IoT Core.

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler { public: virtual ~IoTCoreResponseHandler() {} private: void OnStreamEvent(IoTCoreMessage *response) override { auto message = response->GetMessage(); if (message.has_value() && message.value().GetPayload().has_value()) { auto messageBytes = message.value().GetPayload().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::string messageTopic = message.value().GetTopicName().value().c_str(); std::cout << "Received new message on topic: " << messageTopic << std::endl; std::cout << "Message: " << messageString << std::endl; } } bool OnStreamError(OperationError *error) override { std::cout << "Received an operation error: "; if (error->GetMessage().has_value()) { std::cout << error->GetMessage().value(); } std::cout << std::endl; return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { std::cout << "Subscribe to IoT Core stream closed." << std::endl; } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String topic("test/topic/cpp"); QOS qos = QOS_AT_LEAST_ONCE; int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } SubscribeToIoTCoreRequest request; request.SetTopicName(topic); request.SetQos(qos); auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully subscribed to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to subscribe to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }