AWS IoT Core MQTTPubblicare/sottoscrivere messaggi - AWS IoT Greengrass

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

AWS IoT Core MQTTPubblicare/sottoscrivere messaggi

Il IPC servizio di AWS IoT Core MQTT messaggistica consente di inviare e ricevere MQTT messaggi da e verso AWS IoT Core. I componenti possono pubblicare messaggi AWS IoT Core e sottoscrivere argomenti per agire sui MQTT messaggi provenienti da altre fonti. Per ulteriori informazioni sull' AWS IoT Core implementazione diMQTT, consulta MQTTla Guida per gli AWS IoT Core sviluppatori.

Nota

Questo IPC servizio MQTT di messaggistica consente di scambiare messaggi con AWS IoT Core. Per ulteriori informazioni su come scambiare messaggi tra componenti, vederePubblicare/sottoscrivere messaggi locali.

SDKVersioni minime

Nella tabella seguente sono elencate le versioni minime da utilizzare per pubblicare e sottoscrivere MQTT i messaggi da e verso AWS IoT Core. SDK per dispositivi AWS IoT

Autorizzazione

Per utilizzare la AWS IoT Core MQTT messaggistica in un componente personalizzato, è necessario definire politiche di autorizzazione che consentano al componente di inviare e ricevere messaggi su argomenti. Per informazioni sulla definizione delle politiche di autorizzazione, vedereAutorizza i componenti a eseguire operazioni IPC.

Le politiche di autorizzazione per la AWS IoT Core MQTT messaggistica hanno le seguenti proprietà.

IPCidentificatore del servizio: aws.greengrass.ipc.mqttproxy

Operazione Descrizione Risorse

aws.greengrass#PublishToIoTCore

Consente a un componente di pubblicare messaggi AWS IoT Core sugli MQTT argomenti specificati.

Una stringa di argomento, ad esempiotest/topic, o * per consentire l'accesso a tutti gli argomenti. Puoi utilizzare i caratteri jolly degli MQTT argomenti (#e+) per abbinare più risorse.

aws.greengrass#SubscribeToIoTCore

Consente a un componente di sottoscrivere i AWS IoT Core messaggi relativi agli argomenti specificati.

Una stringa di argomento, ad esempiotest/topic, o * per consentire l'accesso a tutti gli argomenti. Puoi utilizzare i caratteri jolly degli MQTT argomenti (#e+) per abbinare più risorse.

*

Consente a un componente di pubblicare e sottoscrivere AWS IoT Core MQTT messaggi per gli argomenti specificati.

Una stringa di argomento, ad esempiotest/topic, o * per consentire l'accesso a tutti gli argomenti. Puoi utilizzare i caratteri jolly degli MQTT argomenti (#e+) per abbinare più risorse.

MQTTcaratteri jolly nelle politiche di autorizzazione AWS IoT Core MQTT

È possibile utilizzare i caratteri MQTT jolly nelle politiche di AWS IoT Core MQTT IPC autorizzazione. I componenti possono pubblicare e sottoscrivere argomenti che corrispondono al filtro degli argomenti consentito in una politica di autorizzazione. Ad esempio, se la politica di autorizzazione di un componente concede l'accesso atest/topic/#, il componente può sottoscriveretest/topic/#, pubblicare e sottoscriveretest/topic/filter.

Variabili di ricetta nelle politiche di AWS IoT Core MQTT autorizzazione

Se si utilizza la versione 2.6.0 o successiva del nucleo Greengrass, è possibile utilizzare la variabile recipe nelle politiche di autorizzazione. {iot:thingName} Questa funzionalità consente di configurare un'unica politica di autorizzazione per un gruppo di dispositivi principali, in cui ogni dispositivo principale può accedere solo agli argomenti che contengono il proprio nome. Ad esempio, è possibile consentire a un componente l'accesso alla seguente risorsa tematica.

devices/{iot:thingName}/messages

Per ulteriori informazioni, consulta Variabili di ricetta e Usa le variabili di ricetta negli aggiornamenti di fusione.

Esempi di politiche di autorizzazione

Puoi fare riferimento ai seguenti esempi di politiche di autorizzazione per aiutarti a configurare le politiche di autorizzazione per i tuoi componenti.

Esempio di politica di autorizzazione con accesso illimitato

Il seguente esempio di politica di autorizzazione consente a un componente di pubblicare e sottoscrivere tutti gli argomenti.

JSON
{ "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:mqttproxy:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "*" ] } } } }
YAML
--- accessControl: aws.greengrass.ipc.mqttproxy: com.example.MyIoTCorePubSubComponent:mqttproxy:1: policyDescription: Allows access to publish/subscribe to all topics. operations: - aws.greengrass#PublishToIoTCore - aws.greengrass#SubscribeToIoTCore resources: - "*"
Esempio di politica di autorizzazione con accesso limitato

Il seguente esempio di politica di autorizzazione consente a un componente di pubblicare e sottoscrivere due argomenti denominati factory/1/events efactory/1/actions.

JSON
{ "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:mqttproxy:1": { "policyDescription": "Allows access to publish/subscribe to factory 1 topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "factory/1/actions", "factory/1/events" ] } } } }
YAML
--- accessControl: aws.greengrass.ipc.mqttproxy: "com.example.MyIoTCorePubSubComponent:mqttproxy:1": policyDescription: Allows access to publish/subscribe to factory 1 topics. operations: - aws.greengrass#PublishToIoTCore - aws.greengrass#SubscribeToIoTCore resources: - factory/1/actions - factory/1/events
Esempio di politica di autorizzazione per un gruppo di dispositivi principali
Importante

Questo esempio utilizza una funzionalità disponibile per la versione 2.6.0 e successive del componente Greengrass nucleus. Greengrass nucleus v2.6.0 aggiunge il supporto per la maggior parte delle variabili di ricetta, ad esempio nelle configurazioni dei componenti. {iot:thingName}

Il seguente esempio di politica di autorizzazione consente a un componente di pubblicare e sottoscrivere un argomento che contiene il nome del dispositivo principale che esegue il componente.

JSON
{ "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:mqttproxy:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "factory/1/devices/{iot:thingName}/controls" ] } } } }
YAML
--- accessControl: aws.greengrass.ipc.mqttproxy: "com.example.MyIoTCorePubSubComponent:mqttproxy:1": policyDescription: Allows access to publish/subscribe to all topics. operations: - aws.greengrass#PublishToIoTCore - aws.greengrass#SubscribeToIoTCore resources: - factory/1/devices/{iot:thingName}/controls

PublishToIoTCore

Pubblica un MQTT messaggio AWS IoT Core su un argomento.

Quando si pubblicano MQTT messaggi su AWS IoT Core, esiste una quota di 100 transazioni al secondo. Se si supera questa quota, i messaggi vengono messi in coda per l'elaborazione sul dispositivo Greengrass. È inoltre prevista una quota di 512 Kb di dati al secondo e una quota a livello di account di 20.000 pubblicazioni al secondo (2.000 in alcune). Regioni AWS Per ulteriori informazioni sui limiti e le quote dei broker di MQTT messaggi in AWS IoT Core, vedere AWS IoT Core Message Broker and Protocol Limits and Protocol.

Se superi queste quote, il dispositivo Greengrass limita la pubblicazione dei messaggi a. AWS IoT Core I messaggi vengono archiviati in uno spooler in memoria. Per impostazione predefinita, la memoria allocata allo spooler è 2,5 Mb. Se lo spooler si riempie, i nuovi messaggi vengono rifiutati. È possibile aumentare le dimensioni dello spooler. Per ulteriori informazioni, consulta Configurazione nella documentazione Nucleo Greengrass. Per evitare di riempire lo spooler e di dover aumentare la memoria allocata, limita le richieste di pubblicazione a non più di 100 richieste al secondo.

Quando la tua applicazione deve inviare messaggi a una velocità maggiore o messaggi più grandi, prendi in considerazione l'utilizzo di Stream manager per inviare messaggi a Kinesis Data Streams. Il componente stream manager è progettato per trasferire grandi volumi di dati a. Cloud AWS Per ulteriori informazioni, consulta Gestisci i flussi di dati sui dispositivi core Greengrass.

Richiesta

La richiesta di questa operazione ha i seguenti parametri:

topicName(Python:) topic_name

L'argomento su cui pubblicare il messaggio.

qos

Il MQTT QoS da usare. Questo enum ha QOS i seguenti valori:

  • AT_MOST_ONCE— QoS 0. Il MQTT messaggio viene recapitato al massimo una volta.

  • AT_LEAST_ONCE— QoS 1. Il MQTT messaggio viene recapitato almeno una volta.

payload

(Facoltativo) Il payload del messaggio sotto forma di blob.

Le seguenti funzionalità sono disponibili per la versione 2.10.0 e successive di quando si utilizza 5. Nucleo Greengrass MQTT Queste funzionalità vengono ignorate quando si utilizza 3.1.1. MQTT La tabella seguente elenca la versione minima del AWS IoT dispositivo SDK da utilizzare per accedere a queste funzionalità.

payloadFormat

(Facoltativo) Il formato del payload del messaggio. Se non si imposta ilpayloadFormat, si presume che il tipo sia. BYTES L'enum ha i seguenti valori:

  • BYTES— Il contenuto del payload è un blob binario.

  • UTF8— Il contenuto del payload è una UTF8 stringa di caratteri.

retain

(Facoltativo) Indica se impostare l'opzione di MQTT conservazione su true durante la pubblicazione.

userProperties

(Facoltativo) Un elenco di UserProperty oggetti specifici dell'applicazione da inviare. L'UserPropertyoggetto è definito come segue:

UserProperty: key: string value: string
messageExpiryIntervalSeconds

(Facoltativo) Il numero di secondi prima che il messaggio scada e venga eliminato dal server. Se questo valore non è impostato, il messaggio non scade.

correlationData

(Facoltativo) Informazioni aggiunte alla richiesta che possono essere utilizzate per associare una richiesta a una risposta.

responseTopic

(Facoltativo) L'argomento da utilizzare per il messaggio di risposta.

contentType

(Facoltativo) Un identificatore specifico dell'applicazione del tipo di contenuto del messaggio.

Risposta

Questa operazione non fornisce alcuna informazione nella sua risposta.

Esempi

Gli esempi seguenti mostrano come chiamare questa operazione nel codice componente personalizzato.

Java (IPC client V2)
Esempio: pubblicare un messaggio
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest; import software.amazon.awssdk.aws.greengrass.model.QOS; import java.nio.charset.StandardCharsets; public class PublishToIoTCore { public static void main(String[] args) { String topic = args[0]; String message = args[1]; QOS qos = QOS.get(args[2]); try (GreengrassCoreIPCClientV2 ipcClientV2 = GreengrassCoreIPCClientV2.builder().build()) { ipcClientV2.publishToIoTCore(new PublishToIoTCoreRequest() .withTopicName(topic) .withPayload(message.getBytes(StandardCharsets.UTF_8)) .withQos(qos)); System.out.println("Successfully published to topic: " + topic); } catch (Exception e) { System.err.println("Exception occurred."); e.printStackTrace(); System.exit(1); } } }
Python (IPC client V2)
Esempio: pubblicare un messaggio
Nota

Questo esempio presuppone che si stia utilizzando la versione 1.5.4 o successiva di for SDK per dispositivi AWS IoT Python v2.

import awsiot.greengrasscoreipc.clientv2 as clientV2 topic = 'my/topic' qos = '1' payload = 'Hello, World' ipc_client = clientV2.GreengrassCoreIPCClientV2() resp = ipc_client.publish_to_iot_core(topic_name=topic, qos=qos, payload=payload) ipc_client.close()
Java (IPC client V1)
Esempio: pubblicare un messaggio
Nota

Questo esempio utilizza una IPCUtils classe per creare una connessione al IPC servizio AWS IoT Greengrass Core. Per ulteriori informazioni, consulta Connect al IPC servizio AWS IoT Greengrass Core.

package com.aws.greengrass.docs.samples.ipc; import com.aws.greengrass.docs.samples.ipc.util.IPCUtils; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.PublishToIoTCoreResponseHandler; import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest; import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreResponse; import software.amazon.awssdk.aws.greengrass.model.QOS; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PublishToIoTCore { public static final int TIMEOUT_SECONDS = 10; public static void main(String[] args) { String topic = args[0]; String message = args[1]; QOS qos = QOS.get(args[2]); try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); PublishToIoTCoreResponseHandler responseHandler = PublishToIoTCore.publishBinaryMessageToTopic(ipcClient, topic, message, qos); CompletableFuture<PublishToIoTCoreResponse> futureResponse = responseHandler.getResponse(); try { futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); System.out.println("Successfully published to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while publishing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { throw e; } } } catch (InterruptedException e) { System.out.println("IPC interrupted."); } catch (ExecutionException e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } public static PublishToIoTCoreResponseHandler publishBinaryMessageToTopic(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, String message, QOS qos) { PublishToIoTCoreRequest publishToIoTCoreRequest = new PublishToIoTCoreRequest(); publishToIoTCoreRequest.setTopicName(topic); publishToIoTCoreRequest.setPayload(message.getBytes(StandardCharsets.UTF_8)); publishToIoTCoreRequest.setQos(qos); return greengrassCoreIPCClient.publishToIoTCore(publishToIoTCoreRequest, Optional.empty()); } }
Python (IPC client V1)
Esempio: pubblicare un messaggio
Nota

Questo esempio presuppone che si stia utilizzando la versione 1.5.4 o successiva di for SDK per dispositivi AWS IoT Python v2.

import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( QOS, PublishToIoTCoreRequest ) TIMEOUT = 10 ipc_client = awsiot.greengrasscoreipc.connect() topic = "my/topic" message = "Hello, World" qos = QOS.AT_LEAST_ONCE request = PublishToIoTCoreRequest() request.topic_name = topic request.payload = bytes(message, "utf-8") request.qos = qos operation = ipc_client.new_publish_to_iot_core() operation.activate(request) future_response = operation.get_response() future_response.result(TIMEOUT)
C++
Esempio: pubblicare un messaggio
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String message("Hello, World!"); String topic("my/topic"); QOS qos = QOS_AT_MOST_ONCE; int timeout = 10; PublishToIoTCoreRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); request.SetTopicName(topic); request.SetPayload(messageData); request.SetQos(qos); auto operation = ipcClient.NewPublishToIoTCore(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } } return 0; }
JavaScript
Esempio: pubblicare un messaggio
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {QOS, PublishToIoTCoreRequest} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; class PublishToIoTCore { private ipcClient: greengrasscoreipc.Client private readonly topic: string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.publishToIoTCore().then(r => console.log("Started workflow")); } private async publishToIoTCore() { try { const request: PublishToIoTCoreRequest = { topicName: this.topic, qos: QOS.AT_LEAST_ONCE, // you can change this depending on your use case } this.ipcClient = await getIpcClient(); await this.ipcClient.publishToIoTCore(request); } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const publishToIoTCore = new PublishToIoTCore();

SubscribeToIoTCore

Iscriviti ai MQTT messaggi di un AWS IoT Core argomento o di un filtro per argomenti. Il software AWS IoT Greengrass Core rimuove gli abbonamenti quando il componente raggiunge la fine del suo ciclo di vita.

Questa operazione è un'operazione di sottoscrizione in cui ci si iscrive a un flusso di messaggi di eventi. Per utilizzare questa operazione, definite un gestore di risposte di flusso con funzioni che gestiscono i messaggi di evento, gli errori e la chiusura dei flussi. Per ulteriori informazioni, consulta Iscriviti agli stream di IPC eventi.

Tipo di messaggio di evento: IoTCoreMessage

Richiesta

La richiesta di questa operazione ha i seguenti parametri:

topicName(Python:) topic_name

L'argomento a cui iscriversi. È possibile utilizzare i MQTT caratteri jolly degli argomenti (#e+) per sottoscrivere più argomenti.

qos

Il MQTT QoS da usare. Questo enum ha QOS i seguenti valori:

  • AT_MOST_ONCE— QoS 0. Il MQTT messaggio viene recapitato al massimo una volta.

  • AT_LEAST_ONCE— QoS 1. Il MQTT messaggio viene recapitato almeno una volta.

Risposta

La risposta di questa operazione contiene le seguenti informazioni:

messages

Il flusso di MQTT messaggi. Questo oggetto contiene IoTCoreMessage le seguenti informazioni:

message

Il MQTT messaggio. Questo oggetto contiene MQTTMessage le seguenti informazioni:

topicName(Python:) topic_name

L'argomento su cui è stato pubblicato il messaggio.

payload

(Facoltativo) Il payload del messaggio sotto forma di blob.

Le seguenti funzionalità sono disponibili per la versione 2.10.0 e successive di quando si utilizza 5. Nucleo Greengrass MQTT Queste funzionalità vengono ignorate quando si utilizza 3.1.1. MQTT La tabella seguente elenca la versione minima del AWS IoT dispositivo SDK da utilizzare per accedere a queste funzionalità.

payloadFormat

(Facoltativo) Il formato del payload del messaggio. Se non si imposta ilpayloadFormat, si presume che il tipo sia. BYTES L'enum ha i seguenti valori:

  • BYTES— Il contenuto del payload è un blob binario.

  • UTF8— Il contenuto del payload è una UTF8 stringa di caratteri.

retain

(Facoltativo) Indica se impostare l'opzione di MQTT conservazione su true durante la pubblicazione.

userProperties

(Facoltativo) Un elenco di UserProperty oggetti specifici dell'applicazione da inviare. L'UserPropertyoggetto è definito come segue:

UserProperty: key: string value: string
messageExpiryIntervalSeconds

(Facoltativo) Il numero di secondi prima che il messaggio scada e venga eliminato dal server. Se questo valore non è impostato, il messaggio non scade.

correlationData

(Facoltativo) Informazioni aggiunte alla richiesta che possono essere utilizzate per associare una richiesta a una risposta.

responseTopic

(Facoltativo) L'argomento da utilizzare per il messaggio di risposta.

contentType

(Facoltativo) Un identificatore specifico dell'applicazione del tipo di contenuto del messaggio.

Esempi

Gli esempi seguenti mostrano come chiamare questa operazione nel codice componente personalizzato.

Java (IPC client V2)
Esempio: iscriversi ai messaggi
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToIoTCoreResponseHandler; import software.amazon.awssdk.aws.greengrass.model.QOS; import software.amazon.awssdk.aws.greengrass.model.IoTCoreMessage; import software.amazon.awssdk.aws.greengrass.model.SubscribeToIoTCoreRequest; import software.amazon.awssdk.aws.greengrass.model.SubscribeToIoTCoreResponse; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; public class SubscribeToIoTCore { public static void main(String[] args) { String topic = args[0]; QOS qos = QOS.get(args[1]); Consumer<IoTCoreMessage> onStreamEvent = ioTCoreMessage -> System.out.printf("Received new message on topic %s: %s%n", ioTCoreMessage.getMessage().getTopicName(), new String(ioTCoreMessage.getMessage().getPayload(), StandardCharsets.UTF_8)); Optional<Function<Throwable, Boolean>> onStreamError = Optional.of(e -> { System.err.println("Received a stream error."); e.printStackTrace(); return false; }); Optional<Runnable> onStreamClosed = Optional.of(() -> System.out.println("Subscribe to IoT Core stream closed.")); try (GreengrassCoreIPCClientV2 ipcClientV2 = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToIoTCoreRequest request = new SubscribeToIoTCoreRequest() .withTopicName(topic) .withQos(qos); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToIoTCoreResponse, SubscribeToIoTCoreResponseHandler> streamingResponse = ipcClientV2.subscribeToIoTCore(request, onStreamEvent, onStreamError, onStreamClosed); streamingResponse.getResponse(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. while (true) { Thread.sleep(10000); } // To stop subscribing, close the stream. streamingResponse.getHandler().closeStream(); } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } catch (Exception e) { System.err.println("Exception occurred."); e.printStackTrace(); System.exit(1); } } }
Python (IPC client V2)
Esempio: iscriversi ai messaggi
Nota

Questo esempio presuppone che si stia utilizzando la versione 1.5.4 o successiva di for SDK per dispositivi AWS IoT Python v2.

import threading import traceback import awsiot.greengrasscoreipc.clientv2 as clientV2 topic = 'my/topic' qos = '1' def on_stream_event(event): try: topic_name = event.message.topic_name message = str(event.message.payload, 'utf-8') print(f'Received new message on topic {topic_name}: {message}') except: traceback.print_exc() def on_stream_error(error): # Return True to close stream, False to keep stream open. return True def on_stream_closed(): pass ipc_client = clientV2.GreengrassCoreIPCClientV2() resp, operation = ipc_client.subscribe_to_iot_core( topic_name=topic, qos=qos, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed ) # Keep the main thread alive, or the process will exit. event = threading.Event() event.wait() # To stop subscribing, close the operation stream. operation.close() ipc_client.close()
Java (IPC client V1)
Esempio: iscriversi ai messaggi
Nota

Questo esempio utilizza una IPCUtils classe per creare una connessione al IPC servizio AWS IoT Greengrass Core. Per ulteriori informazioni, consulta Connect al IPC servizio AWS IoT Greengrass Core.

package com.aws.greengrass.docs.samples.ipc; import com.aws.greengrass.docs.samples.ipc.util.IPCUtils; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.SubscribeToIoTCoreResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class SubscribeToIoTCore { public static final int TIMEOUT_SECONDS = 10; public static void main(String[] args) { String topic = args[0]; QOS qos = QOS.get(args[1]); try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); StreamResponseHandler<IoTCoreMessage> streamResponseHandler = new SubscriptionResponseHandler(); SubscribeToIoTCoreResponseHandler responseHandler = SubscribeToIoTCore.subscribeToIoTCore(ipcClient, topic, qos, streamResponseHandler); CompletableFuture<SubscribeToIoTCoreResponse> futureResponse = responseHandler.getResponse(); try { futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); System.out.println("Successfully subscribed to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while subscribing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while subscribing to topic: " + topic); } else { throw e; } } // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (InterruptedException e) { System.out.println("IPC interrupted."); } catch (ExecutionException e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } public static SubscribeToIoTCoreResponseHandler subscribeToIoTCore(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, QOS qos, StreamResponseHandler<IoTCoreMessage> streamResponseHandler) { SubscribeToIoTCoreRequest subscribeToIoTCoreRequest = new SubscribeToIoTCoreRequest(); subscribeToIoTCoreRequest.setTopicName(topic); subscribeToIoTCoreRequest.setQos(qos); return greengrassCoreIPCClient.subscribeToIoTCore(subscribeToIoTCoreRequest, Optional.of(streamResponseHandler)); } public static class SubscriptionResponseHandler implements StreamResponseHandler<IoTCoreMessage> { @Override public void onStreamEvent(IoTCoreMessage ioTCoreMessage) { try { String topic = ioTCoreMessage.getMessage().getTopicName(); String message = new String(ioTCoreMessage.getMessage().getPayload(), StandardCharsets.UTF_8); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } @Override public boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; } @Override public void onStreamClosed() { System.out.println("Subscribe to IoT Core stream closed."); } } }
Python (IPC client V1)
Esempio: sottoscrizione ai messaggi
Nota

Questo esempio presuppone che si stia utilizzando la versione 1.5.4 o successiva di for SDK per dispositivi AWS IoT Python v2.

import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( IoTCoreMessage, QOS, SubscribeToIoTCoreRequest ) TIMEOUT = 10 ipc_client = awsiot.greengrasscoreipc.connect() class StreamHandler(client.SubscribeToIoTCoreStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: IoTCoreMessage) -> None: try: message = str(event.message.payload, "utf-8") topic_name = event.message.topic_name # Handle message. except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: # Handle error. return True # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: # Handle close. pass topic = "my/topic" qos = QOS.AT_MOST_ONCE request = SubscribeToIoTCoreRequest() request.topic_name = topic request.qos = qos handler = StreamHandler() operation = ipc_client.new_subscribe_to_iot_core(handler) operation.activate(request) future_response = operation.get_response() future_response.result(TIMEOUT) # Keep the main thread alive, or the process will exit. while True: time.sleep(10) # To stop subscribing, close the operation stream. operation.close()
C++
Esempio: iscriversi ai messaggi
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler { public: virtual ~IoTCoreResponseHandler() {} private: void OnStreamEvent(IoTCoreMessage *response) override { auto message = response->GetMessage(); if (message.has_value() && message.value().GetPayload().has_value()) { auto messageBytes = message.value().GetPayload().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::string topicName = message.value().GetTopicName().value().c_str(); // Handle message. } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); QOS qos = QOS_AT_MOST_ONCE; int timeout = 10; SubscribeToIoTCoreRequest request; request.SetTopicName(topic); request.SetQos(qos); auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
JavaScript
Esempio: iscriversi ai messaggi
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {IoTCoreMessage, QOS, SubscribeToIoTCoreRequest} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc"; class SubscribeToIoTCore { private ipcClient: greengrasscoreipc.Client private readonly topic: string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.subscribeToIoTCore().then(r => console.log("Started workflow")); } private async subscribeToIoTCore() { try { const request: SubscribeToIoTCoreRequest = { topicName: this.topic, qos: QOS.AT_LEAST_ONCE, // you can change this depending on your use case } this.ipcClient = await getIpcClient(); const streamingOperation = this.ipcClient.subscribeToIoTCore(request); streamingOperation.on('message', (message: IoTCoreMessage) => { // parse the message depending on your use cases, e.g. if (message.message && message.message.payload) { const receivedMessage = message.message.payload.toString(); } }); streamingOperation.on('streamError', (error : RpcError) => { // define your own error handling logic }); streamingOperation.on('ended', () => { // define your own logic }); await streamingOperation.activate(); // Keep the main thread alive, or the process will exit. await new Promise((resolve) => setTimeout(resolve, 10000)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const subscribeToIoTCore = new SubscribeToIoTCore();

Esempi

Utilizza i seguenti esempi per imparare a utilizzare il AWS IoT Core MQTT IPC servizio nei tuoi componenti.

La seguente ricetta di esempio consente al componente di pubblicare su tutti gli argomenti.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCorePublisherCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes MQTT messages to IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCorePublisherCpp:mqttproxy:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_iotcore_publisher" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCorePublisherCpp/1.0.0/greengrassv2_iotcore_publisher", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.IoTCorePublisherCpp ComponentVersion: 1.0.0 ComponentDescription: A component that publishes MQTT messages to IoT Core. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.mqttproxy: com.example.IoTCorePublisherCpp:mqttproxy:1: policyDescription: Allows access to publish to all topics. operations: - aws.greengrass#PublishToIoTCore resources: - "*" Manifests: - Lifecycle: Run: "{artifacts:path}/greengrassv2_iotcore_publisher" Artifacts: - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCorePublisherCpp/1.0.0/greengrassv2_iotcore_publisher Permission: Execute: OWNER

L'applicazione C++ di esempio seguente mostra come utilizzare il AWS IoT Core MQTT IPC servizio su cui pubblicare messaggi. AWS IoT Core

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String message("Hello from the Greengrass IPC MQTT publisher (C++)."); String topic("test/topic/cpp"); QOS qos = QOS_AT_LEAST_ONCE; int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } while (true) { PublishToIoTCoreRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); request.SetTopicName(topic); request.SetPayload(messageData); request.SetQos(qos); auto operation = ipcClient.NewPublishToIoTCore(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } std::this_thread::sleep_for(std::chrono::seconds(5)); } return 0; }

La seguente ricetta di esempio consente al componente di sottoscrivere tutti gli argomenti.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCoreSubscriberCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to MQTT messages from IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCoreSubscriberCpp:mqttproxy:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_iotcore_subscriber" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/greengrassv2_iotcore_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.IoTCoreSubscriberCpp ComponentVersion: 1.0.0 ComponentDescription: A component that subscribes to MQTT messages from IoT Core. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.mqttproxy: com.example.IoTCoreSubscriberCpp:mqttproxy:1: policyDescription: Allows access to subscribe to all topics. operations: - aws.greengrass#SubscribeToIoTCore resources: - "*" Manifests: - Lifecycle: Run: "{artifacts:path}/greengrassv2_iotcore_subscriber" Artifacts: - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/greengrassv2_iotcore_subscriber Permission: Execute: OWNER

L'applicazione C++ di esempio seguente mostra come utilizzare il AWS IoT Core MQTT IPC servizio per iscriversi ai messaggi da. AWS IoT Core

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler { public: virtual ~IoTCoreResponseHandler() {} private: void OnStreamEvent(IoTCoreMessage *response) override { auto message = response->GetMessage(); if (message.has_value() && message.value().GetPayload().has_value()) { auto messageBytes = message.value().GetPayload().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::string messageTopic = message.value().GetTopicName().value().c_str(); std::cout << "Received new message on topic: " << messageTopic << std::endl; std::cout << "Message: " << messageString << std::endl; } } bool OnStreamError(OperationError *error) override { std::cout << "Received an operation error: "; if (error->GetMessage().has_value()) { std::cout << error->GetMessage().value(); } std::cout << std::endl; return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { std::cout << "Subscribe to IoT Core stream closed." << std::endl; } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String topic("test/topic/cpp"); QOS qos = QOS_AT_LEAST_ONCE; int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } SubscribeToIoTCoreRequest request; request.SetTopicName(topic); request.SetQos(qos); auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully subscribed to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to subscribe to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }