Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Lokale Nachrichten veröffentlichen/abonnieren
Mit Publish/Sub-Nachrichten (Pubsub) können Sie Nachrichten zu Themen senden und empfangen. Komponenten können Nachrichten zu Themen veröffentlichen, um Nachrichten an andere Komponenten zu senden. Komponenten, die dieses Thema abonniert haben, können dann auf die Nachrichten reagieren, die sie erhalten.
Anmerkung
Sie können diesen IPC Dienst zum Veröffentlichen oder Abonnieren nicht zum Veröffentlichen oder Abonnieren verwenden. AWS IoT Core MQTT Weitere Informationen zum Austausch von Nachrichten mit AWS IoT Core MQTT finden Sie unter. Nachrichten veröffentlichen/abonnieren AWS IoT Core MQTT
SDKMindestversionen
In der folgenden Tabelle sind die Mindestversionen von aufgeführt AWS IoT Device SDK , die Sie verwenden müssen, um Nachrichten zu und von lokalen Themen zu veröffentlichen und zu abonnieren.
SDK | Mindestversion |
---|---|
v1.2.10 |
|
v1.5.3 |
|
v1.17.0 |
|
v1.12.0 |
Autorisierung
Um lokale Veröffentlichungs-/Abonnementnachrichten in einer benutzerdefinierten Komponente zu verwenden, müssen Sie Autorisierungsrichtlinien definieren, die es Ihrer Komponente ermöglichen, Nachrichten an Themen zu senden und zu empfangen. Informationen zur Definition von Autorisierungsrichtlinien finden Sie unter. Autorisieren Sie Komponenten zur Ausführung von Vorgängen IPC
Autorisierungsrichtlinien für Publish/Subscribe-Messaging haben die folgenden Eigenschaften.
IPCDienst-ID: aws.greengrass.ipc.pubsub
Operation | Beschreibung | Ressourcen |
---|---|---|
|
Ermöglicht einer Komponente, Nachrichten zu den von Ihnen angegebenen Themen zu veröffentlichen. |
Eine Themenzeichenfolge, z. Diese Themenzeichenfolge unterstützt keine MQTT Themen-Platzhalter ( |
|
Ermöglicht einer Komponente, Nachrichten zu den von Ihnen angegebenen Themen zu abonnieren. |
Eine Themenzeichenfolge, z. In Greengrass Nucleus v2.6.0 und höher können Sie Themen abonnieren, die Themen-Platzhalter (und) enthaltenMQTT. |
|
Ermöglicht einer Komponente, Nachrichten zu den von Ihnen angegebenen Themen zu veröffentlichen und zu abonnieren. |
Eine Themenzeichenfolge, z. In Greengrass Nucleus v2.6.0 und höher können Sie Themen abonnieren, die Themen-Platzhalter (und) enthaltenMQTT. |
Beispiele für Autorisierungsrichtlinien
Anhand des folgenden Beispiels für eine Autorisierungsrichtlinie können Sie Autorisierungsrichtlinien für Ihre Komponenten konfigurieren.
Beispiel für eine Autorisierungsrichtlinie
Das folgende Beispiel für eine Autorisierungsrichtlinie ermöglicht es einer Komponente, alle Themen zu veröffentlichen und zu abonnieren.
{ "accessControl": { "aws.greengrass.ipc.pubsub": { "
com.example.MyLocalPubSubComponent
:pubsub:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToTopic", "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } }
PublishToTopic
Veröffentlichen einer Nachricht für ein Thema.
Anforderung
Die Anforderung dieses Vorgangs hat die folgenden Parameter:
topic
-
Das Thema, zu dem die Nachricht veröffentlicht werden soll.
publishMessage
(Python:publish_message
)-
Die zu veröffentlichende Nachricht. Dieses Objekt,
PublishMessage
, enthält die folgenden Informationen. Sie müssen einen vonjsonMessage
und angebenbinaryMessage
.jsonMessage
(Python:json_message
)-
(Optional) Eine JSON Nachricht. Dieses Objekt,
JsonMessage
, enthält die folgenden Informationen:message
-
Die JSON Nachricht als Objekt.
context
-
Der Kontext der Nachricht, z. B. das Thema, in dem die Nachricht veröffentlicht wurde.
Diese Funktion ist für Version 2.6.0 und höher der Greengrass Nucleus-Komponente verfügbar. In der folgenden Tabelle sind die Mindestversionen von aufgeführt AWS IoT Device SDK , die Sie für den Zugriff auf den Nachrichtenkontext verwenden müssen.
SDK Mindestversion v1.9.3
v1.11.3
v1.18.4
v1.12.0
Anmerkung
Die AWS IoT Greengrass Core-Software verwendet dieselben Nachrichtenobjekte in den Operationen
PublishToTopic
undSubscribeToTopic
. Die AWS IoT Greengrass Core-Software legt dieses Kontextobjekt in Nachrichten fest, wenn Sie sie abonnieren, und ignoriert dieses Kontextobjekt in Nachrichten, die Sie veröffentlichen.Dieses Objekt,
MessageContext
, enthält die folgenden Informationen:topic
-
Das Thema, in dem die Nachricht veröffentlicht wurde.
binaryMessage
(Python:binary_message
)-
(Optional) Eine binäre Nachricht. Dieses Objekt,
BinaryMessage
, enthält die folgenden Informationen:message
-
Die binäre Nachricht als Blob.
context
-
Der Kontext der Nachricht, z. B. das Thema, in dem die Nachricht veröffentlicht wurde.
Diese Funktion ist für Version 2.6.0 und höher der Greengrass Nucleus-Komponente verfügbar. In der folgenden Tabelle sind die Mindestversionen von aufgeführt AWS IoT Device SDK , die Sie für den Zugriff auf den Nachrichtenkontext verwenden müssen.
SDK Mindestversion v1.9.3
v1.11.3
v1.18.4
v1.12.0
Anmerkung
Die AWS IoT Greengrass Core-Software verwendet dieselben Nachrichtenobjekte in den Operationen
PublishToTopic
undSubscribeToTopic
. Die AWS IoT Greengrass Core-Software legt dieses Kontextobjekt in Nachrichten fest, wenn Sie sie abonnieren, und ignoriert dieses Kontextobjekt in Nachrichten, die Sie veröffentlichen.Dieses Objekt,
MessageContext
, enthält die folgenden Informationen:topic
-
Das Thema, in dem die Nachricht veröffentlicht wurde.
Antwort
Dieser Vorgang liefert in seiner Antwort keine Informationen.
Beispiele
Die folgenden Beispiele zeigen, wie dieser Vorgang in benutzerdefiniertem Komponentencode aufgerufen wird.
SubscribeToTopic
Abonnieren Sie Nachrichten zu einem Thema.
Bei diesem Vorgang handelt es sich um einen Abonnementvorgang, bei dem Sie einen Stream von Ereignisnachrichten abonnieren. Um diese Operation zu verwenden, definieren Sie einen Stream-Response-Handler mit Funktionen, die Ereignismeldungen, Fehler und das Schließen von Streams behandeln. Weitere Informationen finden Sie unter Abonnieren Sie IPC Event-Streams.
Typ der Ereignisnachricht: SubscriptionResponseMessage
Anforderung
Die Anforderung dieses Vorgangs hat die folgenden Parameter:
topic
-
Das Thema, das abonniert werden soll.
Anmerkung
In Greengrass Nucleus v2.6.0 und höher unterstützt MQTT dieses Thema Themen-Platzhalter (und).
#
+
receiveMode
(Python:receive_mode
)-
(Optional) Das Verhalten, das angibt, ob die Komponente Nachrichten von sich selbst empfängt. Sie können dieses Verhalten ändern, damit eine Komponente auf ihre eigenen Nachrichten reagieren kann. Das Standardverhalten hängt davon ab, ob das Thema einen MQTT Platzhalter enthält. Wählen Sie aus den folgenden Optionen aus:
-
RECEIVE_ALL_MESSAGES
— Empfangen Sie alle Nachrichten, die dem Thema entsprechen, einschließlich Nachrichten von der Komponente, die das Abonnement abonniert.Dieser Modus ist die Standardoption, wenn Sie ein Thema abonnieren, das keinen MQTT Platzhalter enthält.
-
RECEIVE_MESSAGES_FROM_OTHERS
— Empfangen Sie alle Nachrichten, die dem Thema entsprechen, mit Ausnahme von Nachrichten von der Komponente, die das Abonnement abonniert.Dieser Modus ist die Standardoption, wenn Sie ein Thema abonnieren, das einen MQTT Platzhalter enthält.
Diese Funktion ist für Version 2.6.0 und höher der Greengrass Nucleus-Komponente verfügbar. In der folgenden Tabelle sind die Mindestversionen von aufgeführt AWS IoT Device SDK , die Sie verwenden müssen, um den Empfangsmodus einzustellen.
SDK Mindestversion v1.9.3
v1.11.3
v1.18.4
v1.12.0
-
Antwort
Die Antwort dieser Operation enthält die folgenden Informationen:
messages
-
Der Nachrichtenstrom. Dieses Objekt,
SubscriptionResponseMessage
, enthält die folgenden Informationen. Jede Nachricht enthältjsonMessage
oderbinaryMessage
.jsonMessage
(Python:json_message
)-
(Optional) Eine JSON Nachricht. Dieses Objekt,
JsonMessage
, enthält die folgenden Informationen:message
-
Die JSON Nachricht als Objekt.
context
-
Der Kontext der Nachricht, z. B. das Thema, in dem die Nachricht veröffentlicht wurde.
Diese Funktion ist für Version 2.6.0 und höher der Greengrass Nucleus-Komponente verfügbar. In der folgenden Tabelle sind die Mindestversionen von aufgeführt AWS IoT Device SDK , die Sie für den Zugriff auf den Nachrichtenkontext verwenden müssen.
SDK Mindestversion v1.9.3
v1.11.3
v1.18.4
v1.12.0
Anmerkung
Die AWS IoT Greengrass Core-Software verwendet dieselben Nachrichtenobjekte in den Operationen
PublishToTopic
undSubscribeToTopic
. Die AWS IoT Greengrass Core-Software legt dieses Kontextobjekt in Nachrichten fest, wenn Sie sie abonnieren, und ignoriert dieses Kontextobjekt in Nachrichten, die Sie veröffentlichen.Dieses Objekt,
MessageContext
, enthält die folgenden Informationen:topic
-
Das Thema, in dem die Nachricht veröffentlicht wurde.
binaryMessage
(Python:binary_message
)-
(Optional) Eine binäre Nachricht. Dieses Objekt,
BinaryMessage
, enthält die folgenden Informationen:message
-
Die binäre Nachricht als Blob.
context
-
Der Kontext der Nachricht, z. B. das Thema, in dem die Nachricht veröffentlicht wurde.
Diese Funktion ist für Version 2.6.0 und höher der Greengrass Nucleus-Komponente verfügbar. In der folgenden Tabelle sind die Mindestversionen von aufgeführt AWS IoT Device SDK , die Sie für den Zugriff auf den Nachrichtenkontext verwenden müssen.
SDK Mindestversion v1.9.3
v1.11.3
v1.18.4
v1.12.0
Anmerkung
Die AWS IoT Greengrass Core-Software verwendet dieselben Nachrichtenobjekte in den Operationen
PublishToTopic
undSubscribeToTopic
. Die AWS IoT Greengrass Core-Software legt dieses Kontextobjekt in Nachrichten fest, wenn Sie sie abonnieren, und ignoriert dieses Kontextobjekt in Nachrichten, die Sie veröffentlichen.Dieses Objekt,
MessageContext
, enthält die folgenden Informationen:topic
-
Das Thema, in dem die Nachricht veröffentlicht wurde.
topicName
(Python:topic_name
)-
Das Thema, zu dem die Nachricht veröffentlicht wurde.
Anmerkung
Diese Eigenschaft wird derzeit nicht verwendet. In Greengrass Nucleus v2.6.0 und höher können Sie den
(jsonMessage|binaryMessage).context.topic
Wert von a abrufen,SubscriptionResponseMessage
um das Thema abzurufen, in dem die Nachricht veröffentlicht wurde.
Beispiele
Die folgenden Beispiele zeigen, wie dieser Vorgang in benutzerdefiniertem Komponentencode aufgerufen wird.
Beispiele
Anhand der folgenden Beispiele erfahren Sie, wie Sie den Dienst zum Veröffentlichen und Abonnieren in Ihren Komponenten verwenden können. IPC
Das folgende Beispielrezept ermöglicht es der Komponente, zu allen Themen zu veröffentlichen.
Die folgende Java-Beispielanwendung zeigt, wie der IPC Publish/Subscribe-Dienst verwendet wird, um Nachrichten in anderen Komponenten zu veröffentlichen.
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.ipc.pubsub; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.model.*; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PubSubPublisher { public static void main(String[] args) { String message = "Hello from the pub/sub publisher (Java)."; String topic = "test/topic/java"; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); while (true) { PublishToTopicRequest publishRequest = new PublishToTopicRequest(); PublishMessage publishMessage = new PublishMessage(); BinaryMessage binaryMessage = new BinaryMessage(); binaryMessage.setMessage(message.getBytes(StandardCharsets.UTF_8)); publishMessage.setBinaryMessage(binaryMessage); publishRequest.setPublishMessage(publishMessage); publishRequest.setTopic(topic); CompletableFuture<PublishToTopicResponse> futureResponse = ipcClient .publishToTopic(publishRequest, Optional.empty()).getResponse(); try { futureResponse.get(10, TimeUnit.SECONDS); System.out.println("Successfully published to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while publishing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Execution exception while publishing to topic: " + topic); } throw e; } Thread.sleep(5000); } } catch (InterruptedException e) { System.out.println("Publisher interrupted."); } catch (Exception e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } }
Das folgende Beispielrezept ermöglicht es der Komponente, alle Themen zu abonnieren.
Die folgende Java-Beispielanwendung zeigt, wie der IPC Publish/Subscribe-Dienst verwendet wird, um Nachrichten für andere Komponenten zu abonnieren.
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.ipc.pubsub; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicRequest; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicResponse; import software.amazon.awssdk.aws.greengrass.model.SubscriptionResponseMessage; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PubSubSubscriber { public static void main(String[] args) { String topic = "test/topic/java"; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); SubscribeToTopicRequest subscribeRequest = new SubscribeToTopicRequest(); subscribeRequest.setTopic(topic); SubscribeToTopicResponseHandler operationResponseHandler = ipcClient .subscribeToTopic(subscribeRequest, Optional.of(new SubscribeResponseHandler())); CompletableFuture<SubscribeToTopicResponse> futureResponse = operationResponseHandler.getResponse(); try { futureResponse.get(10, TimeUnit.SECONDS); System.out.println("Successfully subscribed to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while subscribing to topic: " + topic); throw e; } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while subscribing to topic: " + topic); } else { System.err.println("Execution exception while subscribing to topic: " + topic); } throw e; } // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } } catch (Exception e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } private static class SubscribeResponseHandler implements StreamResponseHandler<SubscriptionResponseMessage> { @Override public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { String message = new String(subscriptionResponseMessage.getBinaryMessage() .getMessage(), StandardCharsets.UTF_8); System.out.println("Received new message: " + message); } catch (Exception e) { e.printStackTrace(); } } @Override public boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } @Override public void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } } }
Das folgende Beispielrezept ermöglicht es der Komponente, zu allen Themen zu veröffentlichen.
Die folgende Python-Beispielanwendung zeigt, wie der IPC Publish/Subscribe-Dienst verwendet wird, um Nachrichten in anderen Komponenten zu veröffentlichen.
import concurrent.futures import sys import time import traceback import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import ( PublishToTopicRequest, PublishMessage, BinaryMessage, UnauthorizedError ) topic = "test/topic/python" message = "Hello from the pub/sub publisher (Python)." TIMEOUT = 10 try: ipc_client = awsiot.greengrasscoreipc.connect() while True: request = PublishToTopicRequest() request.topic = topic publish_message = PublishMessage() publish_message.binary_message = BinaryMessage() publish_message.binary_message.message = bytes(message, "utf-8") request.publish_message = publish_message operation = ipc_client.new_publish_to_topic() operation.activate(request) future_response = operation.get_response() try: future_response.result(TIMEOUT) print('Successfully published to topic: ' + topic) except concurrent.futures.TimeoutError: print('Timeout occurred while publishing to topic: ' + topic, file=sys.stderr) except UnauthorizedError as e: print('Unauthorized error while publishing to topic: ' + topic, file=sys.stderr) raise e except Exception as e: print('Exception while publishing to topic: ' + topic, file=sys.stderr) raise e time.sleep(5) except InterruptedError: print('Publisher interrupted.') except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)
Das folgende Beispielrezept ermöglicht es der Komponente, alle Themen zu abonnieren.
Die folgende Python-Beispielanwendung zeigt, wie der IPC Publish/Subscribe-Dienst verwendet wird, um Nachrichten für andere Komponenten zu abonnieren.
import concurrent.futures import sys import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( SubscribeToTopicRequest, SubscriptionResponseMessage, UnauthorizedError ) topic = "test/topic/python" TIMEOUT = 10 class StreamHandler(client.SubscribeToTopicStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, "utf-8") print("Received new message: " + message) except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: print("Received a stream error.", file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: print('Subscribe to topic stream closed.') try: ipc_client = awsiot.greengrasscoreipc.connect() request = SubscribeToTopicRequest() request.topic = topic handler = StreamHandler() operation = ipc_client.new_subscribe_to_topic(handler) operation.activate(request) future_response = operation.get_response() try: future_response.result(TIMEOUT) print('Successfully subscribed to topic: ' + topic) except concurrent.futures.TimeoutError as e: print('Timeout occurred while subscribing to topic: ' + topic, file=sys.stderr) raise e except UnauthorizedError as e: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) raise e except Exception as e: print('Exception while subscribing to topic: ' + topic, file=sys.stderr) raise e # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)
Das folgende Beispielrezept ermöglicht es der Komponente, in allen Themen zu veröffentlichen.
Die folgende C++-Beispielanwendung zeigt, wie der IPC Publish/Subscribe-Dienst verwendet wird, um Nachrichten in anderen Komponenten zu veröffentlichen.
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String message("Hello from the pub/sub publisher (C++)."); String topic("test/topic/cpp"); int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } while (true) { PublishToTopicRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); BinaryMessage binaryMessage; binaryMessage.SetMessage(messageData); PublishMessage publishMessage; publishMessage.SetBinaryMessage(binaryMessage); request.SetTopic(topic); request.SetPublishMessage(publishMessage); auto operation = ipcClient.NewPublishToTopic(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } std::this_thread::sleep_for(std::chrono::seconds(5)); } return 0; }
Das folgende Beispielrezept ermöglicht es der Komponente, alle Themen zu abonnieren.
Die folgende C++-Beispielanwendung zeigt, wie der IPC Publish/Subscribe-Dienst verwendet wird, um Nachrichten für andere Komponenten zu abonnieren.
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); std::cout << "Received new message: " << messageString << std::endl; } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::cout << "Received new message: " << messageString << std::endl; } } } bool OnStreamError(OperationError *error) override { std::cout << "Received an operation error: "; if (error->GetMessage().has_value()) { std::cout << error->GetMessage().value(); } std::cout << std::endl; return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { std::cout << "Subscribe to topic stream closed." << std::endl; } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String topic("test/topic/cpp"); int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } SubscribeToTopicRequest request; request.SetTopic(topic); auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully subscribed to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to subscribe to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }