Úselo SDK para dispositivos con AWS IoT para comunicarse con el núcleo de Greengrass, otros componentes y AWS IoT Core - AWS IoT Greengrass

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Úselo SDK para dispositivos con AWS IoT para comunicarse con el núcleo de Greengrass, otros componentes y AWS IoT Core

Los componentes que se ejecutan en su dispositivo principal pueden utilizar la biblioteca AWS IoT Greengrass Core interprocess communication (IPC) del SDK para dispositivos con AWS IoT para comunicarse con el AWS IoT Greengrass núcleo y otros componentes de Greengrass. Para desarrollar y ejecutar los componentes personalizados que se utilizanIPC, debe utilizarlos SDK para dispositivos con AWS IoT para conectarse al IPC servicio AWS IoT Greengrass Core y realizar IPC las operaciones.

La IPC interfaz admite dos tipos de operaciones:

  • Solicitud/respuesta

    Los componentes envían una solicitud al IPC servicio y reciben una respuesta que contiene el resultado de la solicitud.

  • Suscripción

    Los componentes envían una solicitud de suscripción al IPC servicio y esperan recibir un flujo de mensajes de eventos como respuesta. Los componentes proporcionan un controlador de suscripciones que gestiona los mensajes de eventos, los errores y el cierre del flujo. SDK para dispositivos con AWS IoT Incluye una interfaz de controlador con la respuesta y los tipos de eventos correctos para cada IPC operación. Para obtener más información, consulte Suscríbase a las transmisiones de IPC eventos.

IPCversiones de cliente

En versiones posteriores de Java y PythonSDKs, AWS IoT Greengrass proporciona una versión mejorada del IPC cliente, denominada IPC cliente V2. IPCcliente V2:

  • Reduce la cantidad de código que hay que escribir para utilizar IPC las operaciones y ayuda a evitar los errores habituales que pueden producirse con el IPC cliente V1.

  • Realiza llamadas a los controladores de suscripciones en un hilo independiente, por lo que ahora puede ejecutar código de bloqueo, incluidas las llamadas a IPC funciones adicionales, en las devoluciones de llamadas a los controladores de suscripciones. IPCel cliente V1 utiliza el mismo hilo para comunicarse con el IPC servidor y para llamar a los gestores de suscripciones.

  • Permite llamar a las operaciones de suscripción mediante expresiones Lambda (Java) o funciones (Python). IPCel cliente V1 requiere que defina las clases de controladores de suscripciones.

  • Proporciona versiones síncronas y asíncronas de cada operación. IPC IPCel cliente V1 solo proporciona versiones asíncronas de cada operación.

Le recomendamos que utilice el IPC cliente V2 para aprovechar estas mejoras. Sin embargo, muchos ejemplos de esta documentación y de algunos contenidos en línea muestran únicamente cómo utilizar el IPC cliente V1. Puede utilizar los siguientes ejemplos y tutoriales para ver ejemplos de componentes que utilizan el IPC cliente V2:

Actualmente, la versión 2 SDK para dispositivos con AWS IoT para C++ solo es compatible con el IPC cliente V1.

Compatible con SDKs la comunicación entre procesos

Las IPC bibliotecas AWS IoT Greengrass principales se incluyen en las siguientes SDK para dispositivos con AWS IoT versiones.

Conéctese al IPC servicio AWS IoT Greengrass principal

Para utilizar la comunicación entre procesos en su componente personalizado, debe crear una conexión a un socket de IPC servidor que ejecute el software AWS IoT Greengrass Core. Realice las siguientes tareas para descargarlo y usarlo SDK para dispositivos con AWS IoT en el idioma que prefiera.

Para usar la versión 2 SDK para dispositivos con AWS IoT para Java (IPCcliente V2)
  1. Descargue el SDK para dispositivos con AWS IoT para Java v2 (versión 1.6.0 o posterior).

  2. Tome alguna de las siguientes medidas para ejecutar el código personalizado en su componente:

    • Cree su componente como un JAR archivo que incluya y ejecute este JAR archivo en la receta de su componente. SDK para dispositivos con AWS IoT

    • Defina el SDK para dispositivos con AWS IoT JAR como un artefacto de componente y añada ese artefacto a la ruta de clases cuando ejecute la aplicación en la receta de su componente.

  3. Usa el siguiente código para crear el cliente. IPC

    try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { // Use client. } catch (Exception e) { LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e); System.exit(1); }
Para usar SDK para dispositivos con AWS IoT para Python v2 (IPCcliente V2)
  1. Descargue el SDK para dispositivos con AWS IoT para Python (versión 1.9.0 o posterior).

  2. Agregue los pasos SDK de instalación al ciclo de vida de la instalación en la receta de su componente.

  3. Cree una conexión con el IPC servicio AWS IoT Greengrass principal. Use el siguiente código para crear el IPC cliente.

    from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 try: ipc_client = GreengrassCoreIPCClientV2() # Use IPC client. except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

Para compilar la SDK para dispositivos con AWS IoT versión 2 para C++, un dispositivo debe tener las siguientes herramientas:

  • C++ 11 o posterior

  • CMake3.1 o posterior

  • Uno de los siguientes copiladores:

    • GCC4.8 o posterior

    • Clang 3.9 o posterior

    • MSVC2015 o posterior

Para usar la versión 2 SDK para dispositivos con AWS IoT para C++
  1. Descargue el SDK para dispositivos con AWS IoT para C++ v2 (versión 1.17.0 o posterior).

  2. Siga las instrucciones de instalación incluidas en el README para compilar la versión 2 SDK para dispositivos con AWS IoT para C++ a partir del código fuente.

  3. En la herramienta de compilación de C++, vincula la IPC biblioteca Greengrass que creaste en el paso anterior. AWS::GreengrassIpc-cpp El siguiente CMakeLists.txt ejemplo vincula la IPC biblioteca Greengrass a un proyecto con el que se crea. CMake

    cmake_minimum_required(VERSION 3.1) project (greengrassv2_pubsub_subscriber) file(GLOB MAIN_SRC "*.h" "*.cpp" ) add_executable(${PROJECT_NAME} ${MAIN_SRC}) set_target_properties(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX CXX_STANDARD 11) find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build) find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build) find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build) target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
  4. En el código del componente, cree una conexión con el IPC servicio AWS IoT Greengrass Core para crear un IPC cliente (Aws::Greengrass::GreengrassCoreIpcClient). Debe definir un controlador del ciclo de vida de la IPC conexión que gestione los eventos de IPC conexión, desconexión y error. El siguiente ejemplo crea un IPC cliente y un controlador del ciclo de vida de la IPC conexión que se imprime cuando el IPC cliente se conecta, se desconecta y encuentra errores.

    #include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { // Create the IPC client. ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } // Use the IPC client to create an operation request. // Activate the operation request. auto activate = operation.Activate(request, nullptr); activate.wait(); // Wait for Greengrass Core to respond to the request. auto responseFuture = operation.GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } // Check the result of the request. auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } return 0; }
  5. Para ejecutar el código personalizado en el componente, cree el código como un artefacto binario y ejecute el artefacto binario en la receta del componente. Establezca el Execute permiso del artefacto en para OWNER permitir que el software AWS IoT Greengrass Core ejecute el artefacto binario.

    La sección Manifests de la receta de su componente podría parecerse al siguiente ejemplo.

    JSON
    { ... "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_pubsub_subscriber" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
    YAML
    ... Manifests: - Lifecycle: Run: {artifacts:path}/greengrassv2_pubsub_subscriber Artifacts: - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber Permission: Execute: OWNER

Para compilar la JavaScript versión 2 SDK para dispositivos con AWS IoT para usarla con Nodejs, un dispositivo debe tener las siguientes herramientas:

  • Node.JS 10.0 o posterior

    • Ejecute node -v para comprobar la versión de Node.

  • CMake3.1 o posterior

Para usar el SDK para dispositivos con AWS IoT para la JavaScript versión 2 (IPCcliente V1)
  1. Descargue el SDK para dispositivos con AWS IoT para la JavaScript versión 2 (versión 1.12.10 o posterior).

  2. Siga las instrucciones de instalación del README para compilar la versión 2 a partir del SDK para dispositivos con AWS IoT código fuente JavaScript.

  3. Cree una conexión al IPC servicio AWS IoT Greengrass principal. Complete los siguientes pasos para crear el IPC cliente y establecer una conexión.

  4. Utilice el siguiente código para crear el IPC cliente.

    import * as greengrascoreipc from 'aws-iot-device-sdk-v2'; let client = greengrascoreipc.createClient();
  5. Utilice el siguiente código para establecer una conexión entre el componente y el núcleo de Greengrass.

    await client.connect();

Autoriza a los componentes a realizar IPC operaciones

Para permitir que sus componentes personalizados utilicen algunas IPC operaciones, debe definir políticas de autorización que permitan al componente realizar la operación en determinados recursos. Cada política de autorización define una lista de operaciones y una lista de recursos que la política permite. Por ejemplo, el IPC servicio de mensajería de publicación/suscripción define las operaciones de publicación y suscripción para los recursos temáticos. Puede utilizar el comodín * para permitir el acceso a todas las operaciones o a todos los recursos.

Las políticas de autorización se definen con el parámetro de configuración accessControl, que se puede establecer en la receta del componente o al implementar el componente. El accessControl objeto asigna los identificadores IPC del servicio a listas de políticas de autorización. Puede definir varias políticas de autorización para cada IPC servicio a fin de controlar el acceso. Cada política de autorización tiene un identificador de política, que debe ser único entre todos los componentes.

sugerencia

Para crear una política únicaIDs, puede combinar el nombre del componente, el nombre del IPC servicio y un contador. Por ejemplo, un componente denominado com.example.HelloWorld podría definir dos políticas de autorización de publicación o suscripción con lo siguiente: IDs

  • com.example.HelloWorld:pubsub:1

  • com.example.HelloWorld:pubsub:2

Las políticas de autorización utilizan el siguiente formato. Este objeto es el parámetro de configuración accessControl.

JSON
{ "IPC service identifier": { "policyId": { "policyDescription": "description", "operations": [ "operation1", "operation2" ], "resources": [ "resource1", "resource2" ] } } }
YAML
IPC service identifier: policyId: policyDescription: description operations: - operation1 - operation2 resources: - resource1 - resource2

Comodines en las políticas de autorización

Puede utilizar el * comodín en el resources elemento de las políticas de IPC autorización para permitir el acceso a varios recursos en una única política de autorización.

  • En todas las versiones del núcleo de Greengrass, puede especificar un solo carácter * como recurso para permitir el acceso a todos los recursos.

  • En el núcleo de Greengrass versión 2.6.0 y versiones posteriores, puede especificar el carácter * de un recurso para que coincida con cualquier combinación de caracteres. Por ejemplo, puede especificar factory/1/devices/Thermostat*/status para permitir el acceso a un tema de estado para todos los dispositivos de termostato de una fábrica, donde el nombre de cada dispositivo comience con Thermostat.

Al definir las políticas de autorización para el AWS IoT Core MQTT IPC servicio, también puede utilizar MQTT caracteres comodín (+y#) para hacer coincidir varios recursos. Para obtener más información, consulte los MQTTcaracteres comodín en las políticas de AWS IoT Core MQTT IPC autorización.

Variables de receta en las políticas de autorización

Si usa Greengrass nucleus v2.6.0 o posterior y establece la opción de interpolateComponentConfigurationconfiguración del núcleo de Greengrass entrue, puede usar la variable de receta en las políticas de autorización. {iot:thingName} Cuando necesite una política de autorización que incluya el nombre del dispositivo principal, como en el caso de MQTT temas o dispositivos ocultos, puede utilizar esta variable de receta para configurar una política de autorización única para un grupo de dispositivos principales. Por ejemplo, puede permitir que un componente acceda al siguiente recurso para IPC realizar operaciones ocultas.

$aws/things/{iot:thingName}/shadow/

Caracteres especiales en las políticas de autorización

Para especificar un literal * o un carácter ? en una política de autorización, debe utilizar una secuencia de escape. Las siguientes secuencias de escape indican al software AWS IoT Greengrass Core que utilice el valor literal en lugar del significado especial del personaje. Por ejemplo, el carácter * es un comodín que coincide con cualquier combinación de caracteres.

Carácter literal Secuencia de escape Notas

*

${*}

?

${?}

AWS IoT Greengrass actualmente no admite el ? comodín, que coincide con cualquier carácter individual.

$

${$}

Use esta secuencia de escape para hacer coincidir un recurso que contenga ${. Por ejemplo, para que coincida con un recurso denominado ${resourceName}, debe especificar ${$}{resourceName}. De lo contrario, para que coincida con un recurso que contiene $, puede usar un literal $, por ejemplo, para permitir el acceso a un tema que comience por $aws.

Ejemplos de políticas de autorización

Puede consultar los siguientes ejemplos de políticas de autorización con el fin de configurar las políticas de autorización para sus componentes.

ejemplo Ejemplo de receta de componentes con una política de autorización

El siguiente ejemplo de receta de componentes incluye un objeto accessControl que define una política de autorización. Esta política autoriza al componente com.example.HelloWorld a publicar en el tema test/topic.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.HelloWorld", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "java -jar {artifacts:path}/HelloWorld.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.HelloWorld ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: "com.example.HelloWorld:pubsub:1": policyDescription: Allows access to publish to test/topic. operations: - "aws.greengrass#PublishToTopic" resources: - "test/topic" Manifests: - Lifecycle: Run: |- java -jar {artifacts:path}/HelloWorld.jar
ejemplo Ejemplo de actualización de la configuración de un componente con una política de autorización

El siguiente ejemplo de actualización de configuración en una implementación especifica la configuración de un componente con un objeto accessControl que define una política de autorización. Esta política autoriza al componente com.example.HelloWorld a publicar en el tema test/topic.

Console
Configuración de combinación
{ "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } }
AWS CLI

El siguiente comando crea una implementación a un dispositivo principal.

aws greengrassv2 create-deployment --cli-input-json file://hello-world-deployment.json

El hello-world-deployment.json archivo contiene el siguiente JSON documento.

{ "targetArn": "arn:aws:iot:us-west-2:123456789012:thing/MyGreengrassCore", "deploymentName": "Deployment for MyGreengrassCore", "components": { "com.example.HelloWorld": { "componentVersion": "1.0.0", "configurationUpdate": { "merge": "{\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"com.example.HelloWorld:pubsub:1\":{\"policyDescription\":\"Allows access to publish to test/topic.\",\"operations\":[\"aws.greengrass#PublishToTopic\"],\"resources\":[\"test/topic\"]}}}}" } } } }
Greengrass CLI

El siguiente CLI comando de Greengrass crea una implementación local en un dispositivo principal.

sudo greengrass-cli deployment create \ --recipeDir recipes \ --artifactDir artifacts \ --merge "com.example.HelloWorld=1.0.0" \ --update-config hello-world-configuration.json

El hello-world-configuration.json archivo contiene el siguiente JSON documento.

{ "com.example.HelloWorld": { "MERGE": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } } }

Suscríbase a las transmisiones de IPC eventos

Puede usar IPC las operaciones para suscribirse a las transmisiones de eventos en un dispositivo principal de Greengrass. Para utilizar una operación de suscripción, defina un gestor de suscripciones y cree una solicitud al IPC servicio. A continuación, el IPC cliente ejecuta las funciones del controlador de suscripciones cada vez que el dispositivo principal transmite un mensaje de evento a su componente.

Puede cerrar una suscripción para dejar de procesar los mensajes de eventos. Para ello, llame a closeStream() (Java), close() (Python) o Close() (C++) en el objeto de operación de suscripción que utilizó para abrir la suscripción.

El IPC servicio AWS IoT Greengrass principal admite las siguientes operaciones de suscripción:

Definición de controladores de suscripción

Para definir un controlador de suscripciones, defina las funciones de devolución de llamada que gestionen los mensajes de eventos, los errores y el cierre de flujos. Si usa el IPC cliente V1, debe definir estas funciones en una clase. Si usa el IPC cliente V2, que está disponible en versiones posteriores de Java y PythonSDKs, puede definir estas funciones sin crear una clase de controlador de suscripciones.

Java

Si usa el IPC cliente V1, debe implementar la software.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType> interfaz genérica. StreamEventTypees el tipo de mensaje de evento para la operación de suscripción. Defina las siguientes funciones para gestionar los mensajes de eventos, los errores y el cierre de flujos.

Si usa el IPC cliente V2, puede definir estas funciones fuera de una clase de controlador de suscripciones o usar expresiones lambda.

void onStreamEvent(StreamEventType event)

La llamada de retorno a la que llama el IPC cliente cuando recibe un mensaje de evento, como un MQTT mensaje o una notificación de actualización de un componente.

boolean onStreamError(Throwable error)

La llamada de retorno a la que el IPC cliente llama cuando se produce un error de transmisión.

Devuelve true para cerrar el flujo de suscripción como resultado del error, o devuelve false para mantener el flujo abierto.

void onStreamClosed()

La devolución de llamada a la que el IPC cliente llama cuando se cierra la transmisión.

Python

Si usa el IPC cliente V1, debe ampliar la clase de controlador de respuesta de la transmisión que corresponde a la operación de suscripción. SDK para dispositivos con AWS IoT Incluye una clase de controlador de suscripciones para cada operación de suscripción. StreamEventTypees el tipo de mensaje de evento para la operación de suscripción. Defina las siguientes funciones para gestionar los mensajes de eventos, los errores y el cierre de flujos.

Si usa el IPC cliente V2, puede definir estas funciones fuera de una clase de controlador de suscripciones o usar expresiones lambda.

def on_stream_event(self, event: StreamEventType) -> None

La llamada de retorno a la que llama el IPC cliente cuando recibe un mensaje de evento, como un MQTT mensaje o una notificación de actualización de un componente.

def on_stream_error(self, error: Exception) -> bool

La llamada de retorno a la que el IPC cliente llama cuando se produce un error de transmisión.

Devuelve true para cerrar el flujo de suscripción como resultado del error, o devuelve false para mantener el flujo abierto.

def on_stream_closed(self) -> None

La devolución de llamada a la que el IPC cliente llama cuando se cierra la transmisión.

C++

Implemente una clase que se derive de la clase del controlador de respuesta de flujo que corresponde a la operación de suscripción. SDK para dispositivos con AWS IoT Incluye una clase base de controlador de suscripciones para cada operación de suscripción. StreamEventTypees el tipo de mensaje de evento para la operación de suscripción. Defina las siguientes funciones para gestionar los mensajes de eventos, los errores y el cierre de flujos.

void OnStreamEvent(StreamEventType *event)

La llamada de retorno a la que llama el IPC cliente cuando recibe un mensaje de evento, como un MQTT mensaje o una notificación de actualización de un componente.

bool OnStreamError(OperationError *error)

La llamada de retorno a la que el IPC cliente llama cuando se produce un error de transmisión.

Devuelve true para cerrar el flujo de suscripción como resultado del error, o devuelve false para mantener el flujo abierto.

void OnStreamClosed()

La devolución de llamada a la que el IPC cliente llama cuando se cierra la transmisión.

JavaScript

Implemente una clase que se derive de la clase del controlador de respuesta de flujo que corresponde a la operación de suscripción. SDK para dispositivos con AWS IoT Incluye una clase base de controlador de suscripciones para cada operación de suscripción. StreamEventTypees el tipo de mensaje de evento para la operación de suscripción. Defina las siguientes funciones para gestionar los mensajes de eventos, los errores y el cierre de flujos.

on(event: 'ended', listener: StreamingOperationEndedListener)

La llamada de retorno a la que el IPC cliente llama cuando se cierra la transmisión.

on(event: 'streamError', listener: StreamingRpcErrorListener)

La devolución de llamada a la que el IPC cliente llama cuando se produce un error en la transmisión.

Devuelve true para cerrar el flujo de suscripción como resultado del error, o devuelve false para mantener el flujo abierto.

on(event: 'message', listener: (message: InboundMessageType) => void)

La llamada de retorno a la que el IPC cliente llama cuando recibe un mensaje de evento, como un MQTT mensaje o una notificación de actualización de un componente.

Ejemplos de controladores de suscripciones

En el siguiente ejemplo, se muestra cómo utilizar la operación SubscribeToTopic y un controlador de suscripciones para suscribirse a la mensajería de publicación y suscripción local.

Java (IPC client V2)
ejemplo Ejemplo: suscribirse a mensajería de publicación y suscripción local
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import java.nio.charset.StandardCharsets; import java.util.Optional; public class SubscribeToTopicV2 { public static void main(String[] args) { String topic = args[0]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse, SubscribeToTopicResponseHandler> response = ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent, Optional.of(SubscribeToTopicV2::onStreamError), Optional.of(SubscribeToTopicV2::onStreamClosed)); SubscribeToTopicResponseHandler responseHandler = response.getHandler(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage(); String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8); String topic = binaryMessage.getContext().getTopic(); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } public static boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } public static void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } }
Python (IPC client V2)
ejemplo Ejemplo: suscribirse a mensajería de publicación y suscripción local
import sys import time import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( SubscriptionResponseMessage, UnauthorizedError ) def main(): args = sys.argv[1:] topic = args[0] try: ipc_client = GreengrassCoreIPCClientV2() # Subscription operations return a tuple with the response and the operation. _, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed) print('Successfully subscribed to topic: ' + topic) # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') # To stop subscribing, close the stream. operation.close() except UnauthorizedError: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) traceback.print_exc() exit(1) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def on_stream_event(event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, 'utf-8') topic = event.binary_message.context.topic print('Received new message on topic %s: %s' % (topic, message)) except: traceback.print_exc() def on_stream_error(error: Exception) -> bool: print('Received a stream error.', file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed() -> None: print('Subscribe to topic stream closed.') if __name__ == '__main__': main()
C++
ejemplo Ejemplo: suscribirse a mensajería de publicación y suscripción local
#include <iostream> #include </crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); // Handle JSON message. } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); // Handle binary message. } } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); int timeout = 10; SubscribeToTopicRequest request; request.SetTopic(topic); //SubscribeResponseHandler streamHandler; auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
JavaScript
ejemplo Ejemplo: suscribirse a mensajería de publicación y suscripción local
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc"; class SubscribeToTopic { private ipcClient : greengrasscoreipc.Client private readonly topic : string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.subscribeToTopic().then(r => console.log("Started workflow")); } private async subscribeToTopic() { try { this.ipcClient = await getIpcClient(); const subscribeToTopicRequest : SubscribeToTopicRequest = { topic: this.topic, } const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options streamingOperation.on("message", (message: SubscriptionResponseMessage) => { // parse the message depending on your use cases, e.g. if(message.binaryMessage && message.binaryMessage.message) { const receivedMessage = message.binaryMessage?.message.toString(); } }); streamingOperation.on("streamError", (error : RpcError) => { // define your own error handling logic }) streamingOperation.on("ended", () => { // define your own logic }) await streamingOperation.activate(); // Keep the main thread alive, or the process will exit. await new Promise((resolve) => setTimeout(resolve, 10000)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const subscribeToTopic = new SubscribeToTopic();

Prácticas recomendadas de IPC

Las prácticas recomendadas para su uso IPC en componentes personalizados difieren entre el IPC cliente V1 y el IPC cliente V2. Siga las prácticas recomendadas para la versión de IPC cliente que utilice.

IPC client V2

El IPC cliente V2 ejecuta las funciones de devolución de llamadas en un hilo independiente, por lo que, en comparación con el IPC cliente V1, hay menos instrucciones que seguir al usar IPC y escribir funciones de gestión de suscripciones.

  • Reutilice un cliente IPC

    Después de crear un IPC cliente, manténgalo abierto y reutilícelo para todas IPC las operaciones. La creación de varios clientes utiliza recursos adicionales y puede provocar pérdidas de recursos.

  • Tratamiento de excepciones

    El IPC cliente V2 registra las excepciones no detectadas en las funciones del gestor de suscripciones. Debe detectar las excepciones en las funciones de su controlador para gestionar los errores que se producen en su código.

IPC client V1

El IPC cliente V1 utiliza un único hilo que se comunica con el IPC servidor y llama a los gestores de suscripciones. Debe tener en cuenta este comportamiento sincrónico al escribir las funciones del controlador de suscripciones.

  • Reutilice un cliente IPC

    Después de crear un IPC cliente, manténgalo abierto y reutilícelo para todas IPC las operaciones. La creación de varios clientes utiliza recursos adicionales y puede provocar pérdidas de recursos.

  • Ejecución del código de bloqueo de forma asincrónica

    El IPC cliente V1 no puede enviar nuevas solicitudes ni procesar nuevos mensajes de eventos mientras el hilo esté bloqueado. Debe ejecutar el código de bloqueo en un subproceso independiente que ejecute desde la función de controlador. El código de bloqueo incluye llamadas sleep, bucles que se ejecutan de forma continua y solicitudes de E/S sincrónicas que tardan en completarse.

  • Envía nuevas IPC solicitudes de forma asíncrona

    El IPC cliente V1 no puede enviar una nueva solicitud desde las funciones del controlador de suscripciones, ya que la solicitud bloquea la función del controlador si esperas una respuesta. Debes enviar IPC las solicitudes en un hilo independiente que ejecutes desde la función de controlador.

  • Tratamiento de excepciones

    El IPC cliente V1 no gestiona las excepciones no detectadas en las funciones del gestor de suscripciones. Si su función de controlador genera una excepción, la suscripción se cierra y la excepción no aparece en los registros de sus componentes. Debe detectar las excepciones en las funciones de su controlador para mantener la suscripción abierta y registrar los errores que se produzcan en el código.