Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Mit Publish/Sub-Nachrichten (Pubsub) können Sie Nachrichten zu Themen senden und empfangen. Komponenten können Nachrichten zu Themen veröffentlichen, um Nachrichten an andere Komponenten zu senden. Komponenten, die dieses Thema abonniert haben, können dann auf die Nachrichten reagieren, die sie erhalten.
Anmerkung
Sie können diesen IPC-Dienst zum Veröffentlichen/Abonnieren nicht verwenden, um MQTT zu veröffentlichen oder zu abonnieren. AWS IoT Core Weitere Informationen zum Austausch von Nachrichten mit MQTT finden Sie unter. AWS IoT Core MQTT-Nachrichten veröffentlichen/abonnieren AWS IoT Core
Minimale SDK-Versionen
In der folgenden Tabelle sind die Mindestversionen von aufgeführt AWS IoT Device SDK , die Sie verwenden müssen, um Nachrichten zu und von lokalen Themen zu veröffentlichen und zu abonnieren.
SDK | Mindestversion |
---|---|
v1.2.10 |
|
v1.5.3 |
|
v1.17.0 |
|
v1.12.0 |
Autorisierung
Um lokale Veröffentlichungs-/Abonnementnachrichten in einer benutzerdefinierten Komponente zu verwenden, müssen Sie Autorisierungsrichtlinien definieren, die es Ihrer Komponente ermöglichen, Nachrichten an Themen zu senden und zu empfangen. Informationen zur Definition von Autorisierungsrichtlinien finden Sie unter. Autorisieren Sie Komponenten zur Ausführung von IPC-Vorgängen
Autorisierungsrichtlinien für Publish/Subscribe-Messaging haben die folgenden Eigenschaften.
IPC-Dienst-ID: aws.greengrass.ipc.pubsub
Operation | Beschreibung | Ressourcen |
---|---|---|
|
Ermöglicht einer Komponente, Nachrichten zu den von Ihnen angegebenen Themen zu veröffentlichen. |
Eine Themenzeichenfolge, z. Diese Themenzeichenfolge unterstützt keine Platzhalter ( |
|
Ermöglicht einer Komponente, Nachrichten für die von Ihnen angegebenen Themen zu abonnieren. |
Eine Themenzeichenfolge, z. In Greengrass Nucleus v2.6.0 und höher können Sie Themen abonnieren, die Platzhalter (und) für MQTT-Themen enthalten. |
|
Ermöglicht einer Komponente, Nachrichten zu den von Ihnen angegebenen Themen zu veröffentlichen und zu abonnieren. |
Eine Themenzeichenfolge, z. In Greengrass Nucleus v2.6.0 und höher können Sie Themen abonnieren, die Platzhalter (und) für MQTT-Themen enthalten. |
Beispiele für Autorisierungsrichtlinien
Anhand des folgenden Beispiels für Autorisierungsrichtlinien können Sie Autorisierungsrichtlinien für Ihre Komponenten konfigurieren.
Beispiel für eine Autorisierungsrichtlinie
Das folgende Beispiel für eine Autorisierungsrichtlinie ermöglicht es einer Komponente, alle Themen zu veröffentlichen und zu abonnieren.
{
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.MyLocalPubSubComponent
:pubsub:1": {
"policyDescription": "Allows access to publish/subscribe to all topics.",
"operations": [
"aws.greengrass#PublishToTopic",
"aws.greengrass#SubscribeToTopic"
],
"resources": [
"*"
]
}
}
}
}
PublishToTopic
Veröffentlichen einer Nachricht für ein Thema.
Anforderung
Die Anforderung dieses Vorgangs hat die folgenden Parameter:
topic
-
Das Thema, zu dem die Nachricht veröffentlicht werden soll.
publishMessage
(Python:publish_message
)-
Die zu veröffentlichende Nachricht. Dieses Objekt,
PublishMessage
, enthält die folgenden Informationen. Sie müssen einen vonjsonMessage
und angebenbinaryMessage
.jsonMessage
(Python:json_message
)-
(Optional) Eine JSON-Nachricht. Dieses Objekt,
JsonMessage
, enthält die folgenden Informationen:message
-
Die JSON-Nachricht als Objekt.
context
-
Der Kontext der Nachricht, z. B. das Thema, in dem die Nachricht veröffentlicht wurde.
Diese Funktion ist für Version 2.6.0 und höher der Greengrass Nucleus-Komponente verfügbar. In der folgenden Tabelle sind die Mindestversionen von aufgeführt AWS IoT Device SDK , die Sie für den Zugriff auf den Nachrichtenkontext verwenden müssen.
SDK Mindestversion v1.9.3
v1.11.3
v1.18.4
v1.12.0
Anmerkung
Die AWS IoT Greengrass Core-Software verwendet dieselben Nachrichtenobjekte in den Operationen
PublishToTopic
undSubscribeToTopic
. Die AWS IoT Greengrass Core-Software legt dieses Kontextobjekt in Nachrichten fest, wenn Sie sie abonnieren, und ignoriert dieses Kontextobjekt in Nachrichten, die Sie veröffentlichen.Dieses Objekt,
MessageContext
, enthält die folgenden Informationen:topic
-
Das Thema, in dem die Nachricht veröffentlicht wurde.
binaryMessage
(Python:binary_message
)-
(Optional) Eine binäre Nachricht. Dieses Objekt,
BinaryMessage
, enthält die folgenden Informationen:message
-
Die binäre Nachricht als Blob.
context
-
Der Kontext der Nachricht, z. B. das Thema, in dem die Nachricht veröffentlicht wurde.
Diese Funktion ist für Version 2.6.0 und höher der Greengrass Nucleus-Komponente verfügbar. In der folgenden Tabelle sind die Mindestversionen von aufgeführt AWS IoT Device SDK , die Sie für den Zugriff auf den Nachrichtenkontext verwenden müssen.
SDK Mindestversion v1.9.3
v1.11.3
v1.18.4
v1.12.0
Anmerkung
Die AWS IoT Greengrass Core-Software verwendet dieselben Nachrichtenobjekte in den Operationen
PublishToTopic
undSubscribeToTopic
. Die AWS IoT Greengrass Core-Software legt dieses Kontextobjekt in Nachrichten fest, wenn Sie sie abonnieren, und ignoriert dieses Kontextobjekt in Nachrichten, die Sie veröffentlichen.Dieses Objekt,
MessageContext
, enthält die folgenden Informationen:topic
-
Das Thema, in dem die Nachricht veröffentlicht wurde.
Antwort
Dieser Vorgang liefert in seiner Antwort keine Informationen.
Beispiele
Die folgenden Beispiele zeigen, wie dieser Vorgang in benutzerdefiniertem Komponentencode aufgerufen wird.
Beispiel: Veröffentlichen Sie eine binäre Nachricht
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.model.BinaryMessage;
import software.amazon.awssdk.aws.greengrass.model.PublishMessage;
import software.amazon.awssdk.aws.greengrass.model.PublishToTopicRequest;
import software.amazon.awssdk.aws.greengrass.model.PublishToTopicResponse;
import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError;
import java.nio.charset.StandardCharsets;
public class PublishToTopicV2 {
public static void main(String[] args) {
String topic = args[0];
String message = args[1];
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
PublishToTopicV2.publishBinaryMessageToTopic(ipcClient, topic, message);
System.out.println("Successfully published to topic: " + topic);
} catch (Exception e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Exception occurred when using IPC.");
}
e.printStackTrace();
System.exit(1);
}
}
public static PublishToTopicResponse publishBinaryMessageToTopic(
GreengrassCoreIPCClientV2 ipcClient, String topic, String message) throws InterruptedException {
BinaryMessage binaryMessage =
new BinaryMessage().withMessage(message.getBytes(StandardCharsets.UTF_8));
PublishMessage publishMessage = new PublishMessage().withBinaryMessage(binaryMessage);
PublishToTopicRequest publishToTopicRequest =
new PublishToTopicRequest().withTopic(topic).withPublishMessage(publishMessage);
return ipcClient.publishToTopic(publishToTopicRequest);
}
}
SubscribeToTopic
Abonnieren Sie Nachrichten zu einem Thema.
Bei diesem Vorgang handelt es sich um einen Abonnementvorgang, bei dem Sie einen Stream von Ereignisnachrichten abonnieren. Um diese Operation zu verwenden, definieren Sie einen Stream-Response-Handler mit Funktionen, die Ereignismeldungen, Fehler und das Schließen von Streams behandeln. Weitere Informationen finden Sie unter Abonnieren Sie IPC-Event-Streams.
Typ der Ereignisnachricht: SubscriptionResponseMessage
Anforderung
Die Anforderung dieser Operation hat die folgenden Parameter:
topic
-
Das Thema, das abonniert werden soll.
Anmerkung
In Greengrass Nucleus v2.6.0 und höher unterstützt dieses Thema Platzhalter (und) für MQTT-Themen.
#
+
receiveMode
(Python:receive_mode
)-
(Optional) Das Verhalten, das angibt, ob die Komponente Nachrichten von sich selbst empfängt. Sie können dieses Verhalten ändern, damit eine Komponente auf ihre eigenen Nachrichten reagieren kann. Das Standardverhalten hängt davon ab, ob das Thema einen MQTT-Platzhalter enthält. Wählen Sie aus den folgenden Optionen aus:
-
RECEIVE_ALL_MESSAGES
— Empfangen Sie alle Nachrichten, die dem Thema entsprechen, einschließlich Nachrichten von der Komponente, die das Abonnement abonniert.Dieser Modus ist die Standardoption, wenn Sie ein Thema abonnieren, das keinen MQTT-Platzhalter enthält.
-
RECEIVE_MESSAGES_FROM_OTHERS
— Empfangen Sie alle Nachrichten, die dem Thema entsprechen, mit Ausnahme von Nachrichten von der Komponente, die das Abonnement abonniert.Dieser Modus ist die Standardoption, wenn Sie ein Thema abonnieren, das einen MQTT-Platzhalter enthält.
Diese Funktion ist für Version 2.6.0 und höher der Greengrass Nucleus-Komponente verfügbar. In der folgenden Tabelle sind die Mindestversionen von aufgeführt AWS IoT Device SDK , die Sie verwenden müssen, um den Empfangsmodus einzustellen.
SDK Mindestversion v1.9.3
v1.11.3
v1.18.4
v1.12.0
-
Antwort
Die Antwort dieser Operation enthält die folgenden Informationen:
messages
-
Der Nachrichtenstrom. Dieses Objekt,
SubscriptionResponseMessage
, enthält die folgenden Informationen. Jede Nachricht enthältjsonMessage
oderbinaryMessage
.jsonMessage
(Python:json_message
)-
(Optional) Eine JSON-Nachricht. Dieses Objekt,
JsonMessage
, enthält die folgenden Informationen:message
-
Die JSON-Nachricht als Objekt.
context
-
Der Kontext der Nachricht, z. B. das Thema, in dem die Nachricht veröffentlicht wurde.
Diese Funktion ist für Version 2.6.0 und höher der Greengrass Nucleus-Komponente verfügbar. In der folgenden Tabelle sind die Mindestversionen von aufgeführt AWS IoT Device SDK , die Sie für den Zugriff auf den Nachrichtenkontext verwenden müssen.
SDK Mindestversion v1.9.3
v1.11.3
v1.18.4
v1.12.0
Anmerkung
Die AWS IoT Greengrass Core-Software verwendet dieselben Nachrichtenobjekte in den Operationen
PublishToTopic
undSubscribeToTopic
. Die AWS IoT Greengrass Core-Software legt dieses Kontextobjekt in Nachrichten fest, wenn Sie sie abonnieren, und ignoriert dieses Kontextobjekt in Nachrichten, die Sie veröffentlichen.Dieses Objekt,
MessageContext
, enthält die folgenden Informationen:topic
-
Das Thema, in dem die Nachricht veröffentlicht wurde.
binaryMessage
(Python:binary_message
)-
(Optional) Eine binäre Nachricht. Dieses Objekt,
BinaryMessage
, enthält die folgenden Informationen:message
-
Die binäre Nachricht als Blob.
context
-
Der Kontext der Nachricht, z. B. das Thema, in dem die Nachricht veröffentlicht wurde.
Diese Funktion ist für Version 2.6.0 und höher der Greengrass Nucleus-Komponente verfügbar. In der folgenden Tabelle sind die Mindestversionen von aufgeführt AWS IoT Device SDK , die Sie für den Zugriff auf den Nachrichtenkontext verwenden müssen.
SDK Mindestversion v1.9.3
v1.11.3
v1.18.4
v1.12.0
Anmerkung
Die AWS IoT Greengrass Core-Software verwendet dieselben Nachrichtenobjekte in den Operationen
PublishToTopic
undSubscribeToTopic
. Die AWS IoT Greengrass Core-Software legt dieses Kontextobjekt in Nachrichten fest, wenn Sie sie abonnieren, und ignoriert dieses Kontextobjekt in Nachrichten, die Sie veröffentlichen.Dieses Objekt,
MessageContext
, enthält die folgenden Informationen:topic
-
Das Thema, in dem die Nachricht veröffentlicht wurde.
topicName
(Python:topic_name
)-
Das Thema, zu dem die Nachricht veröffentlicht wurde.
Anmerkung
Diese Eigenschaft wird derzeit nicht verwendet. In Greengrass Nucleus v2.6.0 und höher können Sie den
(jsonMessage|binaryMessage).context.topic
Wert von a abrufen,SubscriptionResponseMessage
um das Thema abzurufen, in dem die Nachricht veröffentlicht wurde.
Beispiele
Die folgenden Beispiele zeigen, wie dieser Vorgang in benutzerdefiniertem Komponentencode aufgerufen wird.
Beispiel: Lokale Nachrichten zum Publizieren/Abonnieren abonnieren
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.*;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
public class SubscribeToTopicV2 {
public static void main(String[] args) {
String topic = args[0];
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic);
GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse,
SubscribeToTopicResponseHandler> response =
ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent,
Optional.of(SubscribeToTopicV2::onStreamError),
Optional.of(SubscribeToTopicV2::onStreamClosed));
SubscribeToTopicResponseHandler responseHandler = response.getHandler();
System.out.println("Successfully subscribed to topic: " + topic);
// Keep the main thread alive, or the process will exit.
try {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
}
// To stop subscribing, close the stream.
responseHandler.closeStream();
} catch (Exception e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Exception occurred when using IPC.");
}
e.printStackTrace();
System.exit(1);
}
}
public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) {
try {
BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage();
String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8);
String topic = binaryMessage.getContext().getTopic();
System.out.printf("Received new message on topic %s: %s%n", topic, message);
} catch (Exception e) {
System.err.println("Exception occurred while processing subscription response " +
"message.");
e.printStackTrace();
}
}
public static boolean onStreamError(Throwable error) {
System.err.println("Received a stream error.");
error.printStackTrace();
return false; // Return true to close stream, false to keep stream open.
}
public static void onStreamClosed() {
System.out.println("Subscribe to topic stream closed.");
}
}
Beispiele
Anhand der folgenden Beispiele erfahren Sie, wie Sie den Dienst Publish/Subscribe IPC in Ihren Komponenten verwenden können.
Das folgende Beispielrezept ermöglicht es der Komponente, zu allen Themen zu veröffentlichen.
Die folgende Java-Beispielanwendung zeigt, wie der Publish/Subscribe-IPC-Dienst verwendet wird, um Nachrichten in anderen Komponenten zu veröffentlichen.
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0 */
package com.example.ipc.pubsub;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
import software.amazon.awssdk.aws.greengrass.model.*;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class PubSubPublisher {
public static void main(String[] args) {
String message = "Hello from the pub/sub publisher (Java).";
String topic = "test/topic/java";
try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) {
GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection);
while (true) {
PublishToTopicRequest publishRequest = new PublishToTopicRequest();
PublishMessage publishMessage = new PublishMessage();
BinaryMessage binaryMessage = new BinaryMessage();
binaryMessage.setMessage(message.getBytes(StandardCharsets.UTF_8));
publishMessage.setBinaryMessage(binaryMessage);
publishRequest.setPublishMessage(publishMessage);
publishRequest.setTopic(topic);
CompletableFuture<PublishToTopicResponse> futureResponse = ipcClient
.publishToTopic(publishRequest, Optional.empty()).getResponse();
try {
futureResponse.get(10, TimeUnit.SECONDS);
System.out.println("Successfully published to topic: " + topic);
} catch (TimeoutException e) {
System.err.println("Timeout occurred while publishing to topic: " + topic);
} catch (ExecutionException e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Execution exception while publishing to topic: " + topic);
}
throw e;
}
Thread.sleep(5000);
}
} catch (InterruptedException e) {
System.out.println("Publisher interrupted.");
} catch (Exception e) {
System.err.println("Exception occurred when using IPC.");
e.printStackTrace();
System.exit(1);
}
}
}
Das folgende Beispielrezept ermöglicht es der Komponente, alle Themen zu abonnieren.
Die folgende Java-Beispielanwendung zeigt, wie der Publish/Subscribe-IPC-Dienst verwendet wird, um Nachrichten für andere Komponenten zu abonnieren.
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0 */
package com.example.ipc.pubsub;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicRequest;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicResponse;
import software.amazon.awssdk.aws.greengrass.model.SubscriptionResponseMessage;
import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class PubSubSubscriber {
public static void main(String[] args) {
String topic = "test/topic/java";
try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) {
GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection);
SubscribeToTopicRequest subscribeRequest = new SubscribeToTopicRequest();
subscribeRequest.setTopic(topic);
SubscribeToTopicResponseHandler operationResponseHandler = ipcClient
.subscribeToTopic(subscribeRequest, Optional.of(new SubscribeResponseHandler()));
CompletableFuture<SubscribeToTopicResponse> futureResponse = operationResponseHandler.getResponse();
try {
futureResponse.get(10, TimeUnit.SECONDS);
System.out.println("Successfully subscribed to topic: " + topic);
} catch (TimeoutException e) {
System.err.println("Timeout occurred while subscribing to topic: " + topic);
throw e;
} catch (ExecutionException e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while subscribing to topic: " + topic);
} else {
System.err.println("Execution exception while subscribing to topic: " + topic);
}
throw e;
}
// Keep the main thread alive, or the process will exit.
try {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
}
} catch (Exception e) {
System.err.println("Exception occurred when using IPC.");
e.printStackTrace();
System.exit(1);
}
}
private static class SubscribeResponseHandler implements StreamResponseHandler<SubscriptionResponseMessage> {
@Override
public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) {
try {
String message = new String(subscriptionResponseMessage.getBinaryMessage()
.getMessage(), StandardCharsets.UTF_8);
System.out.println("Received new message: " + message);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public boolean onStreamError(Throwable error) {
System.err.println("Received a stream error.");
error.printStackTrace();
return false; // Return true to close stream, false to keep stream open.
}
@Override
public void onStreamClosed() {
System.out.println("Subscribe to topic stream closed.");
}
}
}
Das folgende Beispielrezept ermöglicht es der Komponente, zu allen Themen zu veröffentlichen.
Die folgende Python-Beispielanwendung zeigt, wie der Publish/Subscribe-IPC-Dienst verwendet wird, um Nachrichten in anderen Komponenten zu veröffentlichen.
import concurrent.futures
import sys
import time
import traceback
import awsiot.greengrasscoreipc
from awsiot.greengrasscoreipc.model import (
PublishToTopicRequest,
PublishMessage,
BinaryMessage,
UnauthorizedError
)
topic = "test/topic/python"
message = "Hello from the pub/sub publisher (Python)."
TIMEOUT = 10
try:
ipc_client = awsiot.greengrasscoreipc.connect()
while True:
request = PublishToTopicRequest()
request.topic = topic
publish_message = PublishMessage()
publish_message.binary_message = BinaryMessage()
publish_message.binary_message.message = bytes(message, "utf-8")
request.publish_message = publish_message
operation = ipc_client.new_publish_to_topic()
operation.activate(request)
future_response = operation.get_response()
try:
future_response.result(TIMEOUT)
print('Successfully published to topic: ' + topic)
except concurrent.futures.TimeoutError:
print('Timeout occurred while publishing to topic: ' + topic, file=sys.stderr)
except UnauthorizedError as e:
print('Unauthorized error while publishing to topic: ' + topic, file=sys.stderr)
raise e
except Exception as e:
print('Exception while publishing to topic: ' + topic, file=sys.stderr)
raise e
time.sleep(5)
except InterruptedError:
print('Publisher interrupted.')
except Exception:
print('Exception occurred when using IPC.', file=sys.stderr)
traceback.print_exc()
exit(1)
Das folgende Beispielrezept ermöglicht es der Komponente, alle Themen zu abonnieren.
Die folgende Python-Beispielanwendung zeigt, wie der Publish/Subscribe-IPC-Dienst verwendet wird, um Nachrichten für andere Komponenten zu abonnieren.
import concurrent.futures
import sys
import time
import traceback
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
SubscribeToTopicRequest,
SubscriptionResponseMessage,
UnauthorizedError
)
topic = "test/topic/python"
TIMEOUT = 10
class StreamHandler(client.SubscribeToTopicStreamHandler):
def __init__(self):
super().__init__()
def on_stream_event(self, event: SubscriptionResponseMessage) -> None:
try:
message = str(event.binary_message.message, "utf-8")
print("Received new message: " + message)
except:
traceback.print_exc()
def on_stream_error(self, error: Exception) -> bool:
print("Received a stream error.", file=sys.stderr)
traceback.print_exc()
return False # Return True to close stream, False to keep stream open.
def on_stream_closed(self) -> None:
print('Subscribe to topic stream closed.')
try:
ipc_client = awsiot.greengrasscoreipc.connect()
request = SubscribeToTopicRequest()
request.topic = topic
handler = StreamHandler()
operation = ipc_client.new_subscribe_to_topic(handler)
operation.activate(request)
future_response = operation.get_response()
try:
future_response.result(TIMEOUT)
print('Successfully subscribed to topic: ' + topic)
except concurrent.futures.TimeoutError as e:
print('Timeout occurred while subscribing to topic: ' + topic, file=sys.stderr)
raise e
except UnauthorizedError as e:
print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr)
raise e
except Exception as e:
print('Exception while subscribing to topic: ' + topic, file=sys.stderr)
raise e
# Keep the main thread alive, or the process will exit.
try:
while True:
time.sleep(10)
except InterruptedError:
print('Subscribe interrupted.')
except Exception:
print('Exception occurred when using IPC.', file=sys.stderr)
traceback.print_exc()
exit(1)
Das folgende Beispielrezept ermöglicht es der Komponente, in allen Themen zu veröffentlichen.
Die folgende C++-Beispielanwendung zeigt, wie der IPC-Dienst Publish/Subscribe verwendet wird, um Nachrichten in anderen Komponenten zu veröffentlichen.
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String message("Hello from the pub/sub publisher (C++).");
String topic("test/topic/cpp");
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
while (true) {
PublishToTopicRequest request;
Vector<uint8_t> messageData({message.begin(), message.end()});
BinaryMessage binaryMessage;
binaryMessage.SetMessage(messageData);
PublishMessage publishMessage;
publishMessage.SetBinaryMessage(binaryMessage);
request.SetTopic(topic);
request.SetPublishMessage(publishMessage);
auto operation = ipcClient.NewPublishToTopic();
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully published to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to publish to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
std::this_thread::sleep_for(std::chrono::seconds(5));
}
return 0;
}
Das folgende Beispielrezept ermöglicht es der Komponente, alle Themen zu abonnieren.
Die folgende C++-Beispielanwendung zeigt, wie der Publish/Subscribe-IPC-Dienst verwendet wird, um Nachrichten für andere Komponenten zu abonnieren.
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class SubscribeResponseHandler : public SubscribeToTopicStreamHandler {
public:
virtual ~SubscribeResponseHandler() {}
private:
void OnStreamEvent(SubscriptionResponseMessage *response) override {
auto jsonMessage = response->GetJsonMessage();
if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) {
auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable();
std::cout << "Received new message: " << messageString << std::endl;
} else {
auto binaryMessage = response->GetBinaryMessage();
if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) {
auto messageBytes = binaryMessage.value().GetMessage().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
std::cout << "Received new message: " << messageString << std::endl;
}
}
}
bool OnStreamError(OperationError *error) override {
std::cout << "Received an operation error: ";
if (error->GetMessage().has_value()) {
std::cout << error->GetMessage().value();
}
std::cout << std::endl;
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
std::cout << "Subscribe to topic stream closed." << std::endl;
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String topic("test/topic/cpp");
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
SubscribeToTopicRequest request;
request.SetTopic(topic);
auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToTopic(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully subscribed to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to subscribe to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}