本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
发布/订阅 (pubsub) 消息收发允许您向主题发送和接收消息。组件可以向主题发布消息,以将消息发送给其他组件。然后,订阅该主题的组件可以对它们收到的消息进行操作。
注意
您不能使用此发布/订阅 IPC 服务来发布或订阅 AWS IoT Core MQTT。有关如何使用 AWS IoT Core MQTT 交换消息的更多信息,请参阅发布/订阅 AWS IoT Core MQTT 消息。
最低 SDK 版本
下表列出了在向本地主题发布和订阅消息时必须使用的最低版本。 AWS IoT Device SDK
SDK | 最低版本 |
---|---|
v1.2.10 |
|
v1.5.3 |
|
v1.17.0 |
|
v1.12.0 |
授权
要在自定义组件中使用本地发布/订阅消息收发,必须定义允许您的组件向主题发送和接收消息的授权策略。有关定义授权策略的信息,请参阅授权组件执行 IPC 操作。
发布/订阅消息收发的授权策略具有以下属性。
IPC 服务标识符:aws.greengrass.ipc.pubsub
操作 | 描述 | 资源 |
---|---|---|
|
允许组件向您指定的主题发布消息。 |
主题字符串,例如 此主题字符串不支持 MQTT 主题通配符( |
|
允许组件订阅您指定主题的消息。 |
主题字符串,例如 在 Greengrass Nucleus v2.6.0 及更高版本中,您可以订阅包含 MQTT 主题通配符( |
|
允许组件为您指定的主题发布和订阅消息。 |
主题字符串,例如 在 Greengrass Nucleus v2.6.0 及更高版本中,您可以订阅包含 MQTT 主题通配符( |
授权策略示例
您可以参考以下授权策略示例,帮助您为组件配置授权策略。
例 示例授权策略
以下示例授权策略允许组件发布和订阅所有主题。
{
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.MyLocalPubSubComponent
:pubsub:1": {
"policyDescription": "Allows access to publish/subscribe to all topics.",
"operations": [
"aws.greengrass#PublishToTopic",
"aws.greengrass#SubscribeToTopic"
],
"resources": [
"*"
]
}
}
}
}
PublishToTopic
向主题发布消息。
请求
此操作的请求包含以下参数:
topic
-
要向其发布消息的主题。
publishMessage
(Python:publish_message
)-
待发布的消息。该对象
PublishMessage
包含以下信息。您必须指定jsonMessage
和binaryMessage
中的一个。jsonMessage
(Python:json_message
)-
(可选)一条 JSON 消息。该对象
JsonMessage
包含以下信息:message
-
作为对象的 JSON 消息。
context
-
消息的上下文,例如消息发布的主题。
此功能适用于 Greengrass Nucleus 组件的 v2.6.0 及更高版本。下表列出了访问消息上下文时必须使用的最低 AWS IoT Device SDK 版本。
SDK 最低版本 v1.9.3
v1.11.3
v1.18.4
v1.12.0
注意
C AWS IoT Greengrass ore 软件在
PublishToTopic
和SubscribeToTopic
操作中使用相同的消息对象。当您订阅时, AWS IoT Greengrass Core 软件会在消息中设置此上下文对象,并在您发布的消息中忽略此上下文对象。该对象
MessageContext
包含以下信息:topic
-
消息发布的主题。
binaryMessage
(Python:binary_message
)-
(可选)二进制消息。该对象
BinaryMessage
包含以下信息:message
-
以 blob 形式呈现的二进制消息。
context
-
消息的上下文,例如消息发布的主题。
此功能适用于 Greengrass Nucleus 组件的 v2.6.0 及更高版本。下表列出了访问消息上下文时必须使用的最低 AWS IoT Device SDK 版本。
SDK 最低版本 v1.9.3
v1.11.3
v1.18.4
v1.12.0
注意
C AWS IoT Greengrass ore 软件在
PublishToTopic
和SubscribeToTopic
操作中使用相同的消息对象。当您订阅时, AWS IoT Greengrass Core 软件会在消息中设置此上下文对象,并在您发布的消息中忽略此上下文对象。该对象
MessageContext
包含以下信息:topic
-
消息发布的主题。
响应
此操作在其响应中未提供任何信息。
示例
以下示例演示了如何在自定义组件代码中调用该操作。
例 示例:发布二进制消息
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.model.BinaryMessage;
import software.amazon.awssdk.aws.greengrass.model.PublishMessage;
import software.amazon.awssdk.aws.greengrass.model.PublishToTopicRequest;
import software.amazon.awssdk.aws.greengrass.model.PublishToTopicResponse;
import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError;
import java.nio.charset.StandardCharsets;
public class PublishToTopicV2 {
public static void main(String[] args) {
String topic = args[0];
String message = args[1];
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
PublishToTopicV2.publishBinaryMessageToTopic(ipcClient, topic, message);
System.out.println("Successfully published to topic: " + topic);
} catch (Exception e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Exception occurred when using IPC.");
}
e.printStackTrace();
System.exit(1);
}
}
public static PublishToTopicResponse publishBinaryMessageToTopic(
GreengrassCoreIPCClientV2 ipcClient, String topic, String message) throws InterruptedException {
BinaryMessage binaryMessage =
new BinaryMessage().withMessage(message.getBytes(StandardCharsets.UTF_8));
PublishMessage publishMessage = new PublishMessage().withBinaryMessage(binaryMessage);
PublishToTopicRequest publishToTopicRequest =
new PublishToTopicRequest().withTopic(topic).withPublishMessage(publishMessage);
return ipcClient.publishToTopic(publishToTopicRequest);
}
}
SubscribeToTopic
订阅有关某个主题的消息。
此操作是一种订阅操作,您可以在其中订阅事件消息流。要使用此操作,请定义一个流响应处理程序,其中包含处理事件消息、错误和流关闭的函数。有关更多信息,请参阅 订阅 IPC 事件流。
事件消息类型:SubscriptionResponseMessage
请求
此操作的请求包含以下参数:
topic
-
要订阅的主题。
注意
在 Greengrass Nucleus v2.6.0 及更高版本中,本主题支持 MQTT 主题通配符(
#
和+
)。 receiveMode
(Python:receive_mode
)-
(可选)指定组件是否从自身接收消息的行为。您可以更改此行为以允许组件根据自己的消息进行操作。默认行为取决于主题是否包含 MQTT 通配符。从以下选项中进行选择:
-
RECEIVE_ALL_MESSAGES
– 接收与该主题匹配的所有消息,包括来自订阅组件的消息。当您订阅不包含 MQTT 通配符的主题时,此模式为默认选项。
-
RECEIVE_MESSAGES_FROM_OTHERS
– 接收与该主题匹配的所有消息,不包括来自订阅组件的消息。当您订阅包含 MQTT 通配符的主题时,此模式为默认选项。
此功能适用于 Greengrass Nucleus 组件的 v2.6.0 及更高版本。下表列出了在设置接收模式时 AWS IoT Device SDK 必须使用的最低版本。
SDK 最低版本 v1.9.3
v1.11.3
v1.18.4
v1.12.0
-
响应
此操作的响应包含以下信息:
messages
-
消息流。该对象
SubscriptionResponseMessage
包含以下信息。每条消息都包含jsonMessage
或binaryMessage
。jsonMessage
(Python:json_message
)-
(可选)一条 JSON 消息。该对象
JsonMessage
包含以下信息:message
-
作为对象的 JSON 消息。
context
-
消息的上下文,例如消息发布的主题。
此功能适用于 Greengrass Nucleus 组件的 v2.6.0 及更高版本。下表列出了访问消息上下文时必须使用的最低 AWS IoT Device SDK 版本。
SDK 最低版本 v1.9.3
v1.11.3
v1.18.4
v1.12.0
注意
C AWS IoT Greengrass ore 软件在
PublishToTopic
和SubscribeToTopic
操作中使用相同的消息对象。当您订阅时, AWS IoT Greengrass Core 软件会在消息中设置此上下文对象,并在您发布的消息中忽略此上下文对象。该对象
MessageContext
包含以下信息:topic
-
消息发布的主题。
binaryMessage
(Python:binary_message
)-
(可选)二进制消息。该对象
BinaryMessage
包含以下信息:message
-
以 blob 形式呈现的二进制消息。
context
-
消息的上下文,例如消息发布的主题。
此功能适用于 Greengrass Nucleus 组件的 v2.6.0 及更高版本。下表列出了访问消息上下文时必须使用的最低 AWS IoT Device SDK 版本。
SDK 最低版本 v1.9.3
v1.11.3
v1.18.4
v1.12.0
注意
C AWS IoT Greengrass ore 软件在
PublishToTopic
和SubscribeToTopic
操作中使用相同的消息对象。当您订阅时, AWS IoT Greengrass Core 软件会在消息中设置此上下文对象,并在您发布的消息中忽略此上下文对象。该对象
MessageContext
包含以下信息:topic
-
消息发布的主题。
topicName
(Python:topic_name
)-
消息被发布到的主题。
注意
目前尚未使用该属性。在 Greengrass Nucleus v2.6.0 及更高版本中,您可以从
SubscriptionResponseMessage
中获取(jsonMessage|binaryMessage).context.topic
值以获取消息发布的主题。
示例
以下示例演示了如何在自定义组件代码中调用该操作。
例 示例:订阅本地发布/订阅消息
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.*;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
public class SubscribeToTopicV2 {
public static void main(String[] args) {
String topic = args[0];
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic);
GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse,
SubscribeToTopicResponseHandler> response =
ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent,
Optional.of(SubscribeToTopicV2::onStreamError),
Optional.of(SubscribeToTopicV2::onStreamClosed));
SubscribeToTopicResponseHandler responseHandler = response.getHandler();
System.out.println("Successfully subscribed to topic: " + topic);
// Keep the main thread alive, or the process will exit.
try {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
}
// To stop subscribing, close the stream.
responseHandler.closeStream();
} catch (Exception e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Exception occurred when using IPC.");
}
e.printStackTrace();
System.exit(1);
}
}
public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) {
try {
BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage();
String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8);
String topic = binaryMessage.getContext().getTopic();
System.out.printf("Received new message on topic %s: %s%n", topic, message);
} catch (Exception e) {
System.err.println("Exception occurred while processing subscription response " +
"message.");
e.printStackTrace();
}
}
public static boolean onStreamError(Throwable error) {
System.err.println("Received a stream error.");
error.printStackTrace();
return false; // Return true to close stream, false to keep stream open.
}
public static void onStreamClosed() {
System.out.println("Subscribe to topic stream closed.");
}
}
示例
使用以下示例了解如何在组件中使用发布/订阅 IPC 服务。
以下示例配方允许该组件发布至所有主题。
以下示例 Java 应用程序演示了如何使用发布/订阅 IPC 服务向其他组件发布消息。
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0 */
package com.example.ipc.pubsub;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
import software.amazon.awssdk.aws.greengrass.model.*;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class PubSubPublisher {
public static void main(String[] args) {
String message = "Hello from the pub/sub publisher (Java).";
String topic = "test/topic/java";
try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) {
GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection);
while (true) {
PublishToTopicRequest publishRequest = new PublishToTopicRequest();
PublishMessage publishMessage = new PublishMessage();
BinaryMessage binaryMessage = new BinaryMessage();
binaryMessage.setMessage(message.getBytes(StandardCharsets.UTF_8));
publishMessage.setBinaryMessage(binaryMessage);
publishRequest.setPublishMessage(publishMessage);
publishRequest.setTopic(topic);
CompletableFuture<PublishToTopicResponse> futureResponse = ipcClient
.publishToTopic(publishRequest, Optional.empty()).getResponse();
try {
futureResponse.get(10, TimeUnit.SECONDS);
System.out.println("Successfully published to topic: " + topic);
} catch (TimeoutException e) {
System.err.println("Timeout occurred while publishing to topic: " + topic);
} catch (ExecutionException e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Execution exception while publishing to topic: " + topic);
}
throw e;
}
Thread.sleep(5000);
}
} catch (InterruptedException e) {
System.out.println("Publisher interrupted.");
} catch (Exception e) {
System.err.println("Exception occurred when using IPC.");
e.printStackTrace();
System.exit(1);
}
}
}
以下示例配方允许该组件订阅所有主题。
以下示例 Java 应用程序演示了如何使用发布/订阅 IPC 服务向其他组件订阅消息。
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0 */
package com.example.ipc.pubsub;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicRequest;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicResponse;
import software.amazon.awssdk.aws.greengrass.model.SubscriptionResponseMessage;
import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class PubSubSubscriber {
public static void main(String[] args) {
String topic = "test/topic/java";
try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) {
GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection);
SubscribeToTopicRequest subscribeRequest = new SubscribeToTopicRequest();
subscribeRequest.setTopic(topic);
SubscribeToTopicResponseHandler operationResponseHandler = ipcClient
.subscribeToTopic(subscribeRequest, Optional.of(new SubscribeResponseHandler()));
CompletableFuture<SubscribeToTopicResponse> futureResponse = operationResponseHandler.getResponse();
try {
futureResponse.get(10, TimeUnit.SECONDS);
System.out.println("Successfully subscribed to topic: " + topic);
} catch (TimeoutException e) {
System.err.println("Timeout occurred while subscribing to topic: " + topic);
throw e;
} catch (ExecutionException e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while subscribing to topic: " + topic);
} else {
System.err.println("Execution exception while subscribing to topic: " + topic);
}
throw e;
}
// Keep the main thread alive, or the process will exit.
try {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
}
} catch (Exception e) {
System.err.println("Exception occurred when using IPC.");
e.printStackTrace();
System.exit(1);
}
}
private static class SubscribeResponseHandler implements StreamResponseHandler<SubscriptionResponseMessage> {
@Override
public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) {
try {
String message = new String(subscriptionResponseMessage.getBinaryMessage()
.getMessage(), StandardCharsets.UTF_8);
System.out.println("Received new message: " + message);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public boolean onStreamError(Throwable error) {
System.err.println("Received a stream error.");
error.printStackTrace();
return false; // Return true to close stream, false to keep stream open.
}
@Override
public void onStreamClosed() {
System.out.println("Subscribe to topic stream closed.");
}
}
}
以下示例配方允许该组件发布至所有主题。
以下示例 Python 应用程序演示了如何使用发布/订阅 IPC 服务向其他组件发布消息。
import concurrent.futures
import sys
import time
import traceback
import awsiot.greengrasscoreipc
from awsiot.greengrasscoreipc.model import (
PublishToTopicRequest,
PublishMessage,
BinaryMessage,
UnauthorizedError
)
topic = "test/topic/python"
message = "Hello from the pub/sub publisher (Python)."
TIMEOUT = 10
try:
ipc_client = awsiot.greengrasscoreipc.connect()
while True:
request = PublishToTopicRequest()
request.topic = topic
publish_message = PublishMessage()
publish_message.binary_message = BinaryMessage()
publish_message.binary_message.message = bytes(message, "utf-8")
request.publish_message = publish_message
operation = ipc_client.new_publish_to_topic()
operation.activate(request)
future_response = operation.get_response()
try:
future_response.result(TIMEOUT)
print('Successfully published to topic: ' + topic)
except concurrent.futures.TimeoutError:
print('Timeout occurred while publishing to topic: ' + topic, file=sys.stderr)
except UnauthorizedError as e:
print('Unauthorized error while publishing to topic: ' + topic, file=sys.stderr)
raise e
except Exception as e:
print('Exception while publishing to topic: ' + topic, file=sys.stderr)
raise e
time.sleep(5)
except InterruptedError:
print('Publisher interrupted.')
except Exception:
print('Exception occurred when using IPC.', file=sys.stderr)
traceback.print_exc()
exit(1)
以下示例配方允许该组件订阅所有主题。
以下示例 Python 应用程序演示了如何使用发布/订阅 IPC 服务向其他组件订阅消息。
import concurrent.futures
import sys
import time
import traceback
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
SubscribeToTopicRequest,
SubscriptionResponseMessage,
UnauthorizedError
)
topic = "test/topic/python"
TIMEOUT = 10
class StreamHandler(client.SubscribeToTopicStreamHandler):
def __init__(self):
super().__init__()
def on_stream_event(self, event: SubscriptionResponseMessage) -> None:
try:
message = str(event.binary_message.message, "utf-8")
print("Received new message: " + message)
except:
traceback.print_exc()
def on_stream_error(self, error: Exception) -> bool:
print("Received a stream error.", file=sys.stderr)
traceback.print_exc()
return False # Return True to close stream, False to keep stream open.
def on_stream_closed(self) -> None:
print('Subscribe to topic stream closed.')
try:
ipc_client = awsiot.greengrasscoreipc.connect()
request = SubscribeToTopicRequest()
request.topic = topic
handler = StreamHandler()
operation = ipc_client.new_subscribe_to_topic(handler)
operation.activate(request)
future_response = operation.get_response()
try:
future_response.result(TIMEOUT)
print('Successfully subscribed to topic: ' + topic)
except concurrent.futures.TimeoutError as e:
print('Timeout occurred while subscribing to topic: ' + topic, file=sys.stderr)
raise e
except UnauthorizedError as e:
print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr)
raise e
except Exception as e:
print('Exception while subscribing to topic: ' + topic, file=sys.stderr)
raise e
# Keep the main thread alive, or the process will exit.
try:
while True:
time.sleep(10)
except InterruptedError:
print('Subscribe interrupted.')
except Exception:
print('Exception occurred when using IPC.', file=sys.stderr)
traceback.print_exc()
exit(1)
以下示例配方允许该组件发布至所有主题。
以下示例 C++ 应用程序演示了如何使用发布/订阅 IPC 服务向其他组件发布消息。
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String message("Hello from the pub/sub publisher (C++).");
String topic("test/topic/cpp");
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
while (true) {
PublishToTopicRequest request;
Vector<uint8_t> messageData({message.begin(), message.end()});
BinaryMessage binaryMessage;
binaryMessage.SetMessage(messageData);
PublishMessage publishMessage;
publishMessage.SetBinaryMessage(binaryMessage);
request.SetTopic(topic);
request.SetPublishMessage(publishMessage);
auto operation = ipcClient.NewPublishToTopic();
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully published to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to publish to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
std::this_thread::sleep_for(std::chrono::seconds(5));
}
return 0;
}
以下示例配方允许该组件订阅所有主题。
以下示例 C++ 应用程序演示了如何使用发布/订阅 IPC 服务向其他组件订阅消息。
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class SubscribeResponseHandler : public SubscribeToTopicStreamHandler {
public:
virtual ~SubscribeResponseHandler() {}
private:
void OnStreamEvent(SubscriptionResponseMessage *response) override {
auto jsonMessage = response->GetJsonMessage();
if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) {
auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable();
std::cout << "Received new message: " << messageString << std::endl;
} else {
auto binaryMessage = response->GetBinaryMessage();
if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) {
auto messageBytes = binaryMessage.value().GetMessage().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
std::cout << "Received new message: " << messageString << std::endl;
}
}
}
bool OnStreamError(OperationError *error) override {
std::cout << "Received an operation error: ";
if (error->GetMessage().has_value()) {
std::cout << error->GetMessage().value();
}
std::cout << std::endl;
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
std::cout << "Subscribe to topic stream closed." << std::endl;
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String topic("test/topic/cpp");
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
SubscribeToTopicRequest request;
request.SetTopic(topic);
auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToTopic(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully subscribed to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to subscribe to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}