Use o AWS IoT Device SDK para se comunicar com o núcleo do Greengrass, outros componentes e AWS IoT Core - AWS IoT Greengrass

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Use o AWS IoT Device SDK para se comunicar com o núcleo do Greengrass, outros componentes e AWS IoT Core

Os componentes em execução em seu dispositivo principal podem usar a biblioteca AWS IoT Greengrass Core interprocess communication (IPC) no AWS IoT Device SDK para se comunicar com o AWS IoT Greengrass núcleo e outros componentes do Greengrass. Para desenvolver e executar componentes personalizados que usamIPC, você deve usar o AWS IoT Device SDK para se conectar ao IPC serviço AWS IoT Greengrass principal e realizar IPC operações.

A IPC interface suporta dois tipos de operações:

  • Resposta/solicitação

    Os componentes enviam uma solicitação ao IPC serviço e recebem uma resposta que contém o resultado da solicitação.

  • Assinatura

    Os componentes enviam uma solicitação de assinatura ao IPC serviço e esperam um fluxo de mensagens de eventos em resposta. Os componentes fornecem um manipulador de assinaturas que lida com mensagens de eventos, erros e encerramento de fluxos. AWS IoT Device SDK Isso inclui uma interface de manipulador com os tipos corretos de resposta e evento para cada IPC operação. Para obter mais informações, consulte Inscreva-se em transmissões de IPC eventos.

IPCversões do cliente

Nas versões posteriores do Java e do PythonSDKs, AWS IoT Greengrass fornece uma versão aprimorada do IPC cliente, chamada IPC cliente V2. IPCcliente V2:

  • Reduz a quantidade de código que você precisa escrever para usar IPC as operações e ajuda a evitar erros comuns que podem ocorrer com o IPC cliente V1.

  • Chama os retornos de chamada do manipulador de assinatura em um thread separado, para que agora você possa executar o código de bloqueio, incluindo chamadas de IPC função adicionais, nos retornos de chamada do manipulador de assinatura. IPCO cliente V1 usa o mesmo thread para se comunicar com o IPC servidor e chamar os retornos de chamada do manipulador de assinaturas.

  • Permite chamar operações de assinatura usando expressões Lambda (Java) ou funções (Python). IPCO cliente V1 exige que você defina classes de manipuladores de assinaturas.

  • Fornece versões síncronas e assíncronas de cada operação. IPC IPCO cliente V1 fornece somente versões assíncronas de cada operação.

Recomendamos que você use IPC o cliente V2 para aproveitar essas melhorias. No entanto, muitos exemplos nesta documentação e em alguns conteúdos on-line demonstram somente como usar o IPC cliente V1. Você pode usar os exemplos e tutoriais a seguir para ver exemplos de componentes que usam o IPC cliente V2:

Atualmente, o AWS IoT Device SDK for C++ v2 suporta somente o IPC cliente V1.

Compatível com SDKs comunicação entre processos

As IPC bibliotecas AWS IoT Greengrass principais estão incluídas nas seguintes AWS IoT Device SDK versões.

Conecte-se ao IPC serviço AWS IoT Greengrass principal

Para usar a comunicação entre processos em seu componente personalizado, você deve criar uma conexão com um soquete de IPC servidor executado pelo software AWS IoT Greengrass Core. Conclua as tarefas a seguir para baixar e usar o AWS IoT Device SDK no idioma de sua escolha.

Para usar o AWS IoT Device SDK para Java v2 (IPCcliente V2)
  1. Faça o download do AWS IoT Device SDK para Java v2 (v1.6.0 ou posterior).

  2. Para executar o código personalizado em seu componente, faça o seguinte:

    • Crie seu componente como um JAR arquivo que inclua AWS IoT Device SDK o. e execute esse JAR arquivo na receita do componente.

    • Defina o AWS IoT Device SDK JAR como um artefato de componente e adicione esse artefato ao classpath ao executar seu aplicativo na receita do componente.

  3. Use o código a seguir para criar o IPC cliente.

    try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { // Use client. } catch (Exception e) { LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e); System.exit(1); }
Para usar o AWS IoT Device SDK para Python v2 (cliente V2) IPC
  1. Faça download do AWS IoT Device SDK para Python (v1.9.0 ou posterior).

  2. Adicione as etapas SDK de instalação ao ciclo de vida da instalação na receita do seu componente.

  3. Crie uma conexão com o IPC serviço AWS IoT Greengrass principal. Use o código a seguir para criar o IPC cliente.

    from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 try: ipc_client = GreengrassCoreIPCClientV2() # Use IPC client. except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

Para criar a AWS IoT Device SDK v2 para C++, um dispositivo deve ter as seguintes ferramentas:

  • C++ 11 ou posterior

  • CMake3.1 ou posterior

  • Um dos seguintes compiladores:

    • GCC4.8 ou posterior

    • Clang 3.9 ou posterior

    • MSVC2015 ou mais tarde

Para usar o AWS IoT Device SDK para C++ v2
  1. Faça download do AWS IoT Device SDK para C++ v2 (v1.17.0 ou posterior).

  2. Siga as instruções de instalação em README para compilar o AWS IoT Device SDK para C++ v2 a partir do código-fonte.

  3. Em sua ferramenta de criação em C++, vincule a biblioteca IPC GreengrassAWS::GreengrassIpc-cpp,, que você criou na etapa anterior. O CMakeLists.txt exemplo a seguir vincula a IPC biblioteca Greengrass a um projeto com o qual você constrói. CMake

    cmake_minimum_required(VERSION 3.1) project (greengrassv2_pubsub_subscriber) file(GLOB MAIN_SRC "*.h" "*.cpp" ) add_executable(${PROJECT_NAME} ${MAIN_SRC}) set_target_properties(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX CXX_STANDARD 11) find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build) find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build) find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build) target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
  4. No código do componente, crie uma conexão com o IPC serviço AWS IoT Greengrass principal para criar um IPC cliente (Aws::Greengrass::GreengrassCoreIpcClient). Você deve definir um manipulador do ciclo de vida da IPC conexão que manipule eventos de IPC conexão, desconexão e erro. O exemplo a seguir cria um IPC cliente e um manipulador do ciclo de vida da IPC conexão que imprime quando o IPC cliente se conecta, desconecta e encontra erros.

    #include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { // Create the IPC client. ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } // Use the IPC client to create an operation request. // Activate the operation request. auto activate = operation.Activate(request, nullptr); activate.wait(); // Wait for Greengrass Core to respond to the request. auto responseFuture = operation.GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } // Check the result of the request. auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } return 0; }
  5. Para executar seu código personalizado em seu componente, crie seu código como um artefato binário e execute o artefato binário em sua fórmula de componente. Defina a Execute permissão do artefato OWNER para permitir que o software AWS IoT Greengrass Core execute o artefato binário.

    A seção Manifests da fórmula pode parecer com o exemplo a seguir.

    JSON
    { ... "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_pubsub_subscriber" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
    YAML
    ... Manifests: - Lifecycle: Run: {artifacts:path}/greengrassv2_pubsub_subscriber Artifacts: - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber Permission: Execute: OWNER

Para criar o AWS IoT Device SDK for JavaScript v2 para uso com o NodeJS, um dispositivo deve ter as seguintes ferramentas:

  • NodeJS 10.0 ou posterior

    • Execute node -v para verificar a versão do Node.

  • CMake3.1 ou posterior

Para usar o AWS IoT Device SDK for JavaScript v2 (IPCcliente V1)
  1. Baixe o AWS IoT Device SDK para JavaScript v2 (v1.12.10 ou posterior).

  2. Siga as instruções de instalação em README para compilar o AWS IoT Device SDK for JavaScript v2 a partir do código-fonte.

  3. Crie uma conexão com o IPC serviço AWS IoT Greengrass principal. Conclua as etapas a seguir para criar o IPC cliente e estabelecer uma conexão.

  4. Use o código a seguir para criar o IPC cliente.

    import * as greengrascoreipc from 'aws-iot-device-sdk-v2'; let client = greengrascoreipc.createClient();
  5. Use o código a seguir para estabelecer uma conexão do seu componente com o núcleo do Greengrass.

    await client.connect();

Autorizar componentes a realizar operações IPC

Para permitir que seus componentes personalizados usem algumas IPC operações, você deve definir políticas de autorização que permitam que o componente execute a operação em determinados recursos. Cada política de autorização define uma lista de operações e uma lista de recursos que a política permite. Por exemplo, o IPC serviço de mensagens de publicação/assinatura define operações de publicação e assinatura para recursos de tópicos. É possível especificar o curinga * para permitir o acesso a todas as operações ou a todos os recursos.

Você define políticas de autorização com o parâmetro de configuração accessControl, que pode ser definido na fórmula do componente ou ao implantar o componente. O accessControl objeto mapeia identificadores IPC de serviço para listas de políticas de autorização. Você pode definir várias políticas de autorização para cada IPC serviço para controlar o acesso. Cada política de autorização tem um ID de política, que deve ser exclusivo entre todos os componentes.

dica

Para criar uma política exclusivaIDs, você pode combinar o nome do componente, o nome do IPC serviço e um contador. Por exemplo, um componente chamado com.example.HelloWorld pode definir duas políticas de autorização de publicação/assinatura com o seguinte: IDs

  • com.example.HelloWorld:pubsub:1

  • com.example.HelloWorld:pubsub:2

As políticas de autorização usam o formato a seguir. Esse objeto é o parâmetro de configuração accessControl.

JSON
{ "IPC service identifier": { "policyId": { "policyDescription": "description", "operations": [ "operation1", "operation2" ], "resources": [ "resource1", "resource2" ] } } }
YAML
IPC service identifier: policyId: policyDescription: description operations: - operation1 - operation2 resources: - resource1 - resource2

Curingas nas políticas de autorização

Você pode usar o * caractere curinga no resources elemento das políticas de IPC autorização para permitir o acesso a vários recursos em uma única política de autorização.

  • Em todas as versões do núcleo do Greengrass, você pode especificar um único caractere * como recurso para permitir o acesso a todos os recursos.

  • No Núcleo do Greengrass v2.6.0 e versões posteriores, você pode especificar o caractere * em um recurso para corresponder a qualquer combinação de caracteres. Por exemplo, você pode especificar factory/1/devices/Thermostat*/status para permitir o acesso a um tópico de status para todos os dispositivos de termostato em uma fábrica, onde o nome de cada dispositivo começa com Thermostat.

Ao definir políticas de autorização para o AWS IoT Core MQTT IPC serviço, você também pode usar MQTT curingas (+e#) para corresponder a vários recursos. Para obter mais informações, consulte MQTTcuringas nas políticas de AWS IoT Core MQTT IPC autorização.

Variáveis de fórmula nas políticas de autorização

Se você usar o Greengrass nucleus v2.6.0 ou posterior e definir a opção de interpolateComponentConfigurationconfiguração do Greengrass nucleus comotrue, poderá usar a variável de receita nas políticas de autorização. {iot:thingName} Quando precisar de uma política de autorização que inclua o nome do dispositivo principal, como para MQTT tópicos ou sombras do dispositivo, você pode usar essa variável de receita para configurar uma única política de autorização para um grupo de dispositivos principais. Por exemplo, você pode permitir que um componente acesse o seguinte recurso para IPC operações de sombra.

$aws/things/{iot:thingName}/shadow/

Caracteres especiais em políticas de autorização

Para especificar um caractere * ou ? literal em uma política de autorização, você deve usar uma sequência de escape. As sequências de escape a seguir instruem o software AWS IoT Greengrass Core a usar o valor literal em vez do significado especial do caractere. Por exemplo, o caractere * é um curinga que corresponde a qualquer combinação de caracteres.

Caractere literal Sequência de escape Observações

*

${*}

?

${?}

AWS IoT Greengrass atualmente não suporta o ? curinga, que corresponde a um único caractere.

$

${$}

Use essa sequência de escape para corresponder a um recurso que contém ${. Por exemplo, para corresponder a um recurso chamado ${resourceName}, você deve especificar ${$}{resourceName}. Caso contrário, para corresponder a um recurso que contém $, você pode usar um $ literal, como para permitir acesso a um tópico que comece com $aws.

Exemplos de política de autorização

Consulte os exemplos de política de autorização a seguir para configurar políticas de autorização para seus componentes.

exemplo Exemplo de fórmula de componente com uma política de autorização

O exemplo de fórmula de componente a seguir inclui um objeto accessControl que define uma política de autorização. Essa política autoriza o componente com.example.HelloWorld a publicar no tópico test/topic.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.HelloWorld", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "java -jar {artifacts:path}/HelloWorld.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.HelloWorld ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: "com.example.HelloWorld:pubsub:1": policyDescription: Allows access to publish to test/topic. operations: - "aws.greengrass#PublishToTopic" resources: - "test/topic" Manifests: - Lifecycle: Run: |- java -jar {artifacts:path}/HelloWorld.jar
exemplo Exemplo de atualização da configuração do componente com uma política de autorização

O exemplo de atualização de configuração a seguir em uma implantação especifica a configuração de um componente com um objeto accessControl que define uma política de autorização. Essa política autoriza o componente com.example.HelloWorld a publicar no tópico test/topic.

Console
Configuração a ser mesclada
{ "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } }
AWS CLI

O comando a seguir cria uma implantação em um dispositivo principal.

aws greengrassv2 create-deployment --cli-input-json file://hello-world-deployment.json

O hello-world-deployment.json arquivo contém o seguinte JSON documento.

{ "targetArn": "arn:aws:iot:us-west-2:123456789012:thing/MyGreengrassCore", "deploymentName": "Deployment for MyGreengrassCore", "components": { "com.example.HelloWorld": { "componentVersion": "1.0.0", "configurationUpdate": { "merge": "{\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"com.example.HelloWorld:pubsub:1\":{\"policyDescription\":\"Allows access to publish to test/topic.\",\"operations\":[\"aws.greengrass#PublishToTopic\"],\"resources\":[\"test/topic\"]}}}}" } } } }
Greengrass CLI

O CLI comando Greengrass a seguir cria uma implantação local em um dispositivo principal.

sudo greengrass-cli deployment create \ --recipeDir recipes \ --artifactDir artifacts \ --merge "com.example.HelloWorld=1.0.0" \ --update-config hello-world-configuration.json

O hello-world-configuration.json arquivo contém o seguinte JSON documento.

{ "com.example.HelloWorld": { "MERGE": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } } }

Inscreva-se em transmissões de IPC eventos

Você pode usar IPC as operações para assinar fluxos de eventos em um dispositivo principal do Greengrass. Para usar uma operação de assinatura, defina um manipulador de assinatura e crie uma solicitação para o IPC serviço. Em seguida, o IPC cliente executa as funções do manipulador de assinaturas toda vez que o dispositivo principal transmite uma mensagem de evento para seu componente.

Você pode fechar uma assinatura para interromper o processamento de mensagens de eventos. Para fazer isso, chame closeStream() (Java), close() (Python) ou Close() (C++) no objeto de operação de assinatura que você usou para abrir a assinatura.

O IPC serviço AWS IoT Greengrass principal oferece suporte às seguintes operações de assinatura:

Definir manipuladores de assinaturas

Para definir um manipulador de assinatura, defina funções de retorno de chamada que manipulem mensagens de eventos, erros e encerramento de fluxo. Se você usar o IPC cliente V1, deverá definir essas funções em uma classe. Se você usa o IPC cliente V2, que está disponível em versões posteriores do Java e do SDKs Python, você pode definir essas funções sem criar uma classe de manipulador de assinatura.

Java

Se você usa o IPC cliente V1, deve implementar a software.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType> interface genérica. StreamEventTypeé o tipo de mensagem de evento para a operação de assinatura. Defina as funções a seguir para lidar com mensagens de eventos, erros e encerramento de fluxo.

Se você usa o IPC cliente V2, pode definir essas funções fora de uma classe de manipulador de assinatura ou usar expressões lambda.

void onStreamEvent(StreamEventType event)

O retorno de chamada que o IPC cliente chama quando recebe uma mensagem de evento, como uma MQTT mensagem ou uma notificação de atualização de componente.

boolean onStreamError(Throwable error)

O retorno de chamada que o IPC cliente chama quando ocorre um erro de stream.

Retorne “true” para fechar o fluxo de assinatura como resultado do erro ou retorne “false” para manter o fluxo aberto.

void onStreamClosed()

O retorno de chamada que o IPC cliente chama quando o stream é encerrado.

Python

Se você usar o IPC cliente V1, deverá estender a classe do manipulador de resposta de fluxo que corresponde à operação de assinatura. AWS IoT Device SDK Isso inclui uma classe de gerenciador de assinaturas para cada operação de assinatura. StreamEventTypeé o tipo de mensagem de evento para a operação de assinatura. Defina as funções a seguir para lidar com mensagens de eventos, erros e encerramento de fluxo.

Se você usa o IPC cliente V2, pode definir essas funções fora de uma classe de manipulador de assinatura ou usar expressões lambda.

def on_stream_event(self, event: StreamEventType) -> None

O retorno de chamada que o IPC cliente chama quando recebe uma mensagem de evento, como uma MQTT mensagem ou uma notificação de atualização de componente.

def on_stream_error(self, error: Exception) -> bool

O retorno de chamada que o IPC cliente chama quando ocorre um erro de stream.

Retorne “true” para fechar o fluxo de assinatura como resultado do erro ou retorne “false” para manter o fluxo aberto.

def on_stream_closed(self) -> None

O retorno de chamada que o IPC cliente chama quando o stream é encerrado.

C++

Implemente uma classe derivada da classe do manipulador de resposta de fluxo que corresponda à operação de assinatura. AWS IoT Device SDK Isso inclui uma classe base de manipulador de assinaturas para cada operação de assinatura. StreamEventTypeé o tipo de mensagem de evento para a operação de assinatura. Defina as funções a seguir para lidar com mensagens de eventos, erros e encerramento de fluxo.

void OnStreamEvent(StreamEventType *event)

O retorno de chamada que o IPC cliente chama quando recebe uma mensagem de evento, como uma MQTT mensagem ou uma notificação de atualização de componente.

bool OnStreamError(OperationError *error)

O retorno de chamada que o IPC cliente chama quando ocorre um erro de stream.

Retorne “true” para fechar o fluxo de assinatura como resultado do erro ou retorne “false” para manter o fluxo aberto.

void OnStreamClosed()

O retorno de chamada que o IPC cliente chama quando o stream é encerrado.

JavaScript

Implemente uma classe derivada da classe do manipulador de resposta de fluxo que corresponda à operação de assinatura. AWS IoT Device SDK Isso inclui uma classe base de manipulador de assinaturas para cada operação de assinatura. StreamEventTypeé o tipo de mensagem de evento para a operação de assinatura. Defina as funções a seguir para lidar com mensagens de eventos, erros e encerramento de fluxo.

on(event: 'ended', listener: StreamingOperationEndedListener)

O retorno de chamada que o IPC cliente chama quando o stream é encerrado.

on(event: 'streamError', listener: StreamingRpcErrorListener)

O retorno de chamada que o IPC cliente chama quando ocorre um erro de stream.

Retorne “true” para fechar o fluxo de assinatura como resultado do erro ou retorne “false” para manter o fluxo aberto.

on(event: 'message', listener: (message: InboundMessageType) => void)

O retorno de chamada que o IPC cliente chama quando recebe uma mensagem de evento, como uma MQTT mensagem ou uma notificação de atualização de componente.

Exemplo de manipuladores de assinatura

O exemplo a seguir demonstra como usar a operação SubscribeToTopic e um manipulador de assinaturas para assinar o sistema local de publicação e assinatura de mensagens.

Java (IPC client V2)
exemplo Exemplo: assinar mensagens locais de publicação e assinatura
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import java.nio.charset.StandardCharsets; import java.util.Optional; public class SubscribeToTopicV2 { public static void main(String[] args) { String topic = args[0]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse, SubscribeToTopicResponseHandler> response = ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent, Optional.of(SubscribeToTopicV2::onStreamError), Optional.of(SubscribeToTopicV2::onStreamClosed)); SubscribeToTopicResponseHandler responseHandler = response.getHandler(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage(); String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8); String topic = binaryMessage.getContext().getTopic(); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } public static boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } public static void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } }
Python (IPC client V2)
exemplo Exemplo: assinar mensagens locais de publicação e assinatura
import sys import time import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( SubscriptionResponseMessage, UnauthorizedError ) def main(): args = sys.argv[1:] topic = args[0] try: ipc_client = GreengrassCoreIPCClientV2() # Subscription operations return a tuple with the response and the operation. _, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed) print('Successfully subscribed to topic: ' + topic) # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') # To stop subscribing, close the stream. operation.close() except UnauthorizedError: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) traceback.print_exc() exit(1) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def on_stream_event(event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, 'utf-8') topic = event.binary_message.context.topic print('Received new message on topic %s: %s' % (topic, message)) except: traceback.print_exc() def on_stream_error(error: Exception) -> bool: print('Received a stream error.', file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed() -> None: print('Subscribe to topic stream closed.') if __name__ == '__main__': main()
C++
exemplo Exemplo: assinar mensagens locais de publicação e assinatura
#include <iostream> #include </crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); // Handle JSON message. } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); // Handle binary message. } } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); int timeout = 10; SubscribeToTopicRequest request; request.SetTopic(topic); //SubscribeResponseHandler streamHandler; auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
JavaScript
exemplo Exemplo: assinar mensagens locais de publicação e assinatura
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc"; class SubscribeToTopic { private ipcClient : greengrasscoreipc.Client private readonly topic : string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.subscribeToTopic().then(r => console.log("Started workflow")); } private async subscribeToTopic() { try { this.ipcClient = await getIpcClient(); const subscribeToTopicRequest : SubscribeToTopicRequest = { topic: this.topic, } const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options streamingOperation.on("message", (message: SubscriptionResponseMessage) => { // parse the message depending on your use cases, e.g. if(message.binaryMessage && message.binaryMessage.message) { const receivedMessage = message.binaryMessage?.message.toString(); } }); streamingOperation.on("streamError", (error : RpcError) => { // define your own error handling logic }) streamingOperation.on("ended", () => { // define your own logic }) await streamingOperation.activate(); // Keep the main thread alive, or the process will exit. await new Promise((resolve) => setTimeout(resolve, 10000)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const subscribeToTopic = new SubscribeToTopic();

Práticas recomendadas do IPC

As melhores práticas para uso IPC em componentes personalizados diferem entre o IPC cliente V1 e o IPC cliente V2. Siga as melhores práticas para a versão IPC do cliente que você usa.

IPC client V2

O IPC cliente V2 executa funções de retorno de chamada em um thread separado, portanto, em comparação com o IPC cliente V1, há menos diretrizes a serem seguidas ao usar IPC e escrever funções de manipulador de assinaturas.

  • Reutilize um cliente IPC

    Depois de criar um IPC cliente, mantenha-o aberto e reutilize-o em todas as IPC operações. A criação de vários clientes usa recursos extras e pode resultar em vazamentos de recursos.

  • Processar exceções

    O IPC cliente V2 registra exceções não detectadas nas funções do manipulador de assinaturas. Você deve capturar exceções nas funções do manipulador para lidar com erros que ocorrem no seu código.

IPC client V1

O IPC cliente V1 usa um único thread que se comunica com o IPC servidor e chama os manipuladores de assinatura. Você deve considerar esse comportamento síncrono ao escrever funções de manipulador de assinaturas.

  • Reutilize um cliente IPC

    Depois de criar um IPC cliente, mantenha-o aberto e reutilize-o em todas as IPC operações. A criação de vários clientes usa recursos extras e pode resultar em vazamentos de recursos.

  • Executar o código de bloqueio de forma assíncrona

    O IPC cliente V1 não pode enviar novas solicitações ou processar novas mensagens de eventos enquanto o thread está bloqueado. Você deve executar o código de bloqueio em um encadeamento separado, executado a partir da função de manipulador. O código de bloqueio inclui chamadas sleep, loops que são executados continuamente e solicitações de E/S síncronas que demoram para serem concluídas.

  • Enviar novas IPC solicitações de forma assíncrona

    O IPC cliente V1 não pode enviar uma nova solicitação de dentro das funções do manipulador de assinatura, porque a solicitação bloqueia a função do manipulador se você esperar por uma resposta. Você deve enviar IPC solicitações em um thread separado que você executa a partir da função de manipulador.

  • Processar exceções

    O IPC cliente V1 não lida com exceções não detectadas nas funções do manipulador de assinaturas. Se sua função de manipulador gerar uma exceção, a assinatura será encerrada e a exceção não aparecerá nos logs do componente. Você deve capturar exceções nas funções do manipulador para manter a assinatura aberta e com erros que ocorrem no seu código.