ローカルメッセージをパブリッシュ/サブスクライブする
パブリッシュ/サブスクライブ (pubsub) メッセージを使用すると、トピックにメッセージを送受信できます。コンポーネントはメッセージをトピックに発行して、他のコンポーネントにメッセージを送信できます。その後、そのトピックをサブスクライブしているコンポーネントは、受信したメッセージに対応できます。
最小 SDK バージョン
以下の表に、ローカルトピックとメッセージの発行およびサブスクライブのやりとりを行う際に使用が必要となる AWS IoT Device SDK の最小バージョンを示します。
認可
ローカルのパブリッシュ/サブスクライブメッセージングをカスタムコンポーネントで使用するには、コンポーネントがトピックにメッセージを送受信できるようにする承認ポリシーを定義する必要があります。承認ポリシーの定義については、「コンポーネントに IPC オペレーションの実行を許可する」を参照してください。
パブリッシュ/サブスクライブメッセージングの承認ポリシーには以下のプロパティがあります。
IPC サービス識別子: aws.greengrass.ipc.pubsub
操作 |
説明 |
リソース |
aws.greengrass#PublishToTopic
|
コンポーネントが指定したトピックにメッセージを発行できるようにします。
|
test/topic などのトピック文字列。トピック内の任意の文字の組み合わせに一致させるには * を使用します。
このトピック文字列は、MQTT トピックのワイルドカード (# および + ) をサポートしません。
|
aws.greengrass#SubscribeToTopic
|
コンポーネントが、指定したトピックに関するメッセージをサブスクライブできるようにします。
|
test/topic などのトピック文字列。トピック内の任意の文字の組み合わせに一致させるには * を使用します。
Greengrass nucleus v2.6.0 以降では、MQTT トピックワイルドカード (# および + ) を含むトピックをサブスクライブできます。このトピック文字列は MQTT トピックのワイルドカードを文字そのものとしてサポートします。例えば、コンポーネントの承認ポリシーで test/topic/# へのアクセス権が付与されている場合、コンポーネントは test/topic/# をサブスクライブできますが、test/topic/filter はサブスクライブできません。
|
*
|
コンポーネントが、指定したトピックのメッセージを発行およびサブスクライブできるようにします。
|
test/topic などのトピック文字列。トピック内の任意の文字の組み合わせに一致させるには * を使用します。
Greengrass nucleus v2.6.0 以降では、MQTT トピックワイルドカード (# および + ) を含むトピックをサブスクライブできます。このトピック文字列は MQTT トピックのワイルドカードを文字そのものとしてサポートします。例えば、コンポーネントの承認ポリシーで test/topic/# へのアクセス権が付与されている場合、コンポーネントは test/topic/# をサブスクライブできますが、test/topic/filter はサブスクライブできません。
|
承認ポリシーの例
次の承認ポリシーの例を参照して、コンポーネントの承認ポリシーの設定に役立てることができます。
例 承認ポリシーの例
以下の承認ポリシーの例では、コンポーネントがすべてのトピックを公開およびサブスクライブすることを許可します。
{
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.MyLocalPubSubComponent
:pubsub:1": {
"policyDescription": "Allows access to publish/subscribe to all topics.",
"operations": [
"aws.greengrass#PublishToTopic",
"aws.greengrass#SubscribeToTopic"
],
"resources": [
"*"
]
}
}
}
}
PublishToTopic
トピックへのメッセージの発行
リクエスト
このオペレーションのリクエストには以下のパラメータがあります。
topic
-
メッセージの発行先として指定するトピック。
publishMessage
(Python: publish_message
)
-
発行するメッセージ。このオブジェクト (PublishMessage
) には、次の情報が含まれます。jsonMessage
と binaryMessage
のどちらか 1 つを指定する必要があります。
jsonMessage
(Python: json_message
)
-
(オプション) JSON メッセージ。このオブジェクト (JsonMessage
) には、次の情報が含まれます。
message
-
オブジェクトとしての JSON メッセージ。
context
-
メッセージが公開されたトピックなど、メッセージのコンテキスト。
この機能は、Greengrass nucleus コンポーネントの v2.6.0 以降で利用できます。次の表に、メッセージコンテキストへのアクセスに使用する必要がある AWS IoT Device SDK の最小バージョンを示します。
AWS IoT Greengrass Core ソフトウェアは PublishToTopic
および SubscribeToTopic
オペレーションで同じメッセージオブジェクトを使用します。AWS IoT Greengrass Core ソフトウェアは、サブスクライブ時にこのコンテキストオブジェクトをメッセージに設定し、発行するメッセージ内のこのコンテキストオブジェクトを無視します。
このオブジェクト (MessageContext
) には、次の情報が含まれます。
binaryMessage
(Python: binary_message
)
-
(オプション) バイナリメッセージ。このオブジェクト (BinaryMessage
) には、次の情報が含まれます。
message
-
BLOB としてのバイナリメッセージ。
context
-
メッセージが公開されたトピックなど、メッセージのコンテキスト。
この機能は、Greengrass nucleus コンポーネントの v2.6.0 以降で利用できます。次の表に、メッセージコンテキストへのアクセスに使用する必要がある AWS IoT Device SDK の最小バージョンを示します。
AWS IoT Greengrass Core ソフトウェアは PublishToTopic
および SubscribeToTopic
オペレーションで同じメッセージオブジェクトを使用します。AWS IoT Greengrass Core ソフトウェアは、サブスクライブ時にこのコンテキストオブジェクトをメッセージに設定し、発行するメッセージ内のこのコンテキストオブジェクトを無視します。
このオブジェクト (MessageContext
) には、次の情報が含まれます。
レスポンス
このオペレーションはレスポンスで一切の情報を提供しません。
例
以下の例では、カスタムコンポーネントコードでこのオペレーションを呼び出す方法を示します。
- Java (IPC client V2)
-
例: バイナリメッセージを発行する
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.model.BinaryMessage;
import software.amazon.awssdk.aws.greengrass.model.PublishMessage;
import software.amazon.awssdk.aws.greengrass.model.PublishToTopicRequest;
import software.amazon.awssdk.aws.greengrass.model.PublishToTopicResponse;
import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError;
import java.nio.charset.StandardCharsets;
public class PublishToTopicV2 {
public static void main(String[] args) {
String topic = args[0];
String message = args[1];
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
PublishToTopicV2.publishBinaryMessageToTopic(ipcClient, topic, message);
System.out.println("Successfully published to topic: " + topic);
} catch (Exception e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Exception occurred when using IPC.");
}
e.printStackTrace();
System.exit(1);
}
}
public static PublishToTopicResponse publishBinaryMessageToTopic(
GreengrassCoreIPCClientV2 ipcClient, String topic, String message) throws InterruptedException {
BinaryMessage binaryMessage =
new BinaryMessage().withMessage(message.getBytes(StandardCharsets.UTF_8));
PublishMessage publishMessage = new PublishMessage().withBinaryMessage(binaryMessage);
PublishToTopicRequest publishToTopicRequest =
new PublishToTopicRequest().withTopic(topic).withPublishMessage(publishMessage);
return ipcClient.publishToTopic(publishToTopicRequest);
}
}
- Python (IPC client V2)
-
例: バイナリメッセージを発行する
import sys
import traceback
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import (
PublishMessage,
BinaryMessage
)
def main():
args = sys.argv[1:]
topic = args[0]
message = args[1]
try:
ipc_client = GreengrassCoreIPCClientV2()
publish_binary_message_to_topic(ipc_client, topic, message)
print('Successfully published to topic: ' + topic)
except Exception:
print('Exception occurred', file=sys.stderr)
traceback.print_exc()
exit(1)
def publish_binary_message_to_topic(ipc_client, topic, message):
binary_message = BinaryMessage(message=bytes(message, 'utf-8'))
publish_message = PublishMessage(binary_message=binary_message)
return ipc_client.publish_to_topic(topic=topic, publish_message=publish_message)
if __name__ == '__main__':
main()
- C++
-
例: バイナリメッセージを発行する
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
// Handle connection to IPC service.
}
void OnDisconnectCallback(RpcError error) override {
// Handle disconnection from IPC service.
}
bool OnErrorCallback(RpcError error) override {
// Handle IPC service connection error.
return true;
}
};
int main() {
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
String topic("my/topic");
String message("Hello, World!");
int timeout = 10;
PublishToTopicRequest request;
Vector<uint8_t> messageData({message.begin(), message.end()});
BinaryMessage binaryMessage;
binaryMessage.SetMessage(messageData);
PublishMessage publishMessage;
publishMessage.SetBinaryMessage(binaryMessage);
request.SetTopic(topic);
request.SetPublishMessage(publishMessage);
auto operation = ipcClient.NewPublishToTopic();
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (!response) {
// Handle error.
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
(void)error;
// Handle operation error.
} else {
// Handle RPC error.
}
}
return 0;
}
- JavaScript
-
例: バイナリメッセージを発行する
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc";
import {BinaryMessage, PublishMessage, PublishToTopicRequest} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model";
class PublishToTopic {
private ipcClient : greengrasscoreipc.Client
private readonly topic : string;
private readonly messageString : string;
constructor() {
// define your own constructor, e.g.
this.topic = "<define_your_topic>";
this.messageString = "<define_your_message_string>";
this.publishToTopic().then(r => console.log("Started workflow"));
}
private async publishToTopic() {
try {
this.ipcClient = await getIpcClient();
const binaryMessage : BinaryMessage = {
message: this.messageString
}
const publishMessage : PublishMessage = {
binaryMessage: binaryMessage
}
const request : PublishToTopicRequest = {
topic: this.topic,
publishMessage: publishMessage
}
this.ipcClient.publishToTopic(request).finally(() => console.log(`Published message ${publishMessage.binaryMessage?.message} to topic`))
} catch (e) {
// parse the error depending on your use cases
throw e
}
}
}
export async function getIpcClient(){
try {
const ipcClient = greengrasscoreipc.createClient();
await ipcClient.connect()
.catch(error => {
// parse the error depending on your use cases
throw error;
});
return ipcClient
} catch (err) {
// parse the error depending on your use cases
throw err
}
}
// starting point
const publishToTopic = new PublishToTopic();
SubscribeToTopic
トピックのメッセージをサブスクライブする。
このオペレーションはサブスクリプションオペレーションで、イベントメッセージのストリームをサブスクライブするというものです。このオペレーションを使用するには、イベントメッセージ、エラー、およびストリームクロージャを処理する関数を使用して、ストリームレスポンスハンドラーを定義します。(詳細については、IPC イベントストリームへのサブスクライブ を参照してください)。
イベントメッセージの種類: SubscriptionResponseMessage
リクエスト
このオペレーションのリクエストには以下のパラメータがあります。
topic
-
サブスクライブ先のトピック。
receiveMode
(Python: receive_mode
)
-
(オプション) コンポーネントが自身からのメッセージを受信するかどうかを指定する動作。この動作を変更して、コンポーネントが自身のメッセージに基づいて動作できるようにすることができます。デフォルトの動作は、トピックに MQTT ワイルドカードが含まれているかどうかによって異なります。次のオプションから選択します。
-
RECEIVE_ALL_MESSAGES
– サブスクライブするコンポーネントからのメッセージを含む、トピックに一致するすべてのメッセージを受信します。
このモードは、MQTT ワイルドカードを含まないトピックをサブスクライブする場合のデフォルトオプションです。
-
RECEIVE_MESSAGES_FROM_OTHERS
– サブスクライブするコンポーネントからのメッセージを除き、トピックに一致するすべてのメッセージを受信します。
このモードは、MQTT ワイルドカードを含むトピックをサブスクライブする場合のデフォルトオプションです。
この機能は、Greengrass nucleus コンポーネントの v2.6.0 以降で利用できます。次の表に、受信モードの設定に使用する必要がある AWS IoT Device SDK の最小バージョンを示します。
レスポンス
このオペレーションのレスポンスには以下の情報が含まれます。
messages
-
メッセージのストリーム。このオブジェクト (SubscriptionResponseMessage
) には、次の情報が含まれます。各メッセージには jsonMessage
または binaryMessage
が含まれます。
jsonMessage
(Python: json_message
)
-
(オプション) JSON メッセージ。このオブジェクト (JsonMessage
) には、次の情報が含まれます。
message
-
オブジェクトとしての JSON メッセージ。
context
-
メッセージが公開されたトピックなど、メッセージのコンテキスト。
この機能は、Greengrass nucleus コンポーネントの v2.6.0 以降で利用できます。次の表に、メッセージコンテキストへのアクセスに使用する必要がある AWS IoT Device SDK の最小バージョンを示します。
AWS IoT Greengrass Core ソフトウェアは PublishToTopic
および SubscribeToTopic
オペレーションで同じメッセージオブジェクトを使用します。AWS IoT Greengrass Core ソフトウェアは、サブスクライブ時にこのコンテキストオブジェクトをメッセージに設定し、発行するメッセージ内のこのコンテキストオブジェクトを無視します。
このオブジェクト (MessageContext
) には、次の情報が含まれます。
binaryMessage
(Python: binary_message
)
-
(オプション) バイナリメッセージ。このオブジェクト (BinaryMessage
) には、次の情報が含まれます。
message
-
BLOB としてのバイナリメッセージ。
context
-
メッセージが公開されたトピックなど、メッセージのコンテキスト。
この機能は、Greengrass nucleus コンポーネントの v2.6.0 以降で利用できます。次の表に、メッセージコンテキストへのアクセスに使用する必要がある AWS IoT Device SDK の最小バージョンを示します。
AWS IoT Greengrass Core ソフトウェアは PublishToTopic
および SubscribeToTopic
オペレーションで同じメッセージオブジェクトを使用します。AWS IoT Greengrass Core ソフトウェアは、サブスクライブ時にこのコンテキストオブジェクトをメッセージに設定し、発行するメッセージ内のこのコンテキストオブジェクトを無視します。
このオブジェクト (MessageContext
) には、次の情報が含まれます。
topicName
(Python: topic_name
)
-
メッセージが発行されたトピック。
このプロパティは現在使用されていません。Greengrass nucleus v2.6.0 以降では、SubscriptionResponseMessage
からの (jsonMessage|binaryMessage).context.topic
の値を取得して、メッセージが発行されたトピックを取得できます。
例
以下の例では、カスタムコンポーネントコードでこのオペレーションを呼び出す方法を示します。
- Java (IPC client V2)
-
例: ローカルのパブリッシュ/サブスクライブメッセージをサブスクライブする
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.*;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
public class SubscribeToTopicV2 {
public static void main(String[] args) {
String topic = args[0];
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic);
GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse,
SubscribeToTopicResponseHandler> response =
ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent,
Optional.of(SubscribeToTopicV2::onStreamError),
Optional.of(SubscribeToTopicV2::onStreamClosed));
SubscribeToTopicResponseHandler responseHandler = response.getHandler();
System.out.println("Successfully subscribed to topic: " + topic);
// Keep the main thread alive, or the process will exit.
try {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
}
// To stop subscribing, close the stream.
responseHandler.closeStream();
} catch (Exception e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Exception occurred when using IPC.");
}
e.printStackTrace();
System.exit(1);
}
}
public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) {
try {
BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage();
String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8);
String topic = binaryMessage.getContext().getTopic();
System.out.printf("Received new message on topic %s: %s%n", topic, message);
} catch (Exception e) {
System.err.println("Exception occurred while processing subscription response " +
"message.");
e.printStackTrace();
}
}
public static boolean onStreamError(Throwable error) {
System.err.println("Received a stream error.");
error.printStackTrace();
return false; // Return true to close stream, false to keep stream open.
}
public static void onStreamClosed() {
System.out.println("Subscribe to topic stream closed.");
}
}
- Python (IPC client V2)
-
例: ローカルのパブリッシュ/サブスクライブメッセージをサブスクライブする
import sys
import time
import traceback
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import (
SubscriptionResponseMessage,
UnauthorizedError
)
def main():
args = sys.argv[1:]
topic = args[0]
try:
ipc_client = GreengrassCoreIPCClientV2()
# Subscription operations return a tuple with the response and the operation.
_, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event,
on_stream_error=on_stream_error, on_stream_closed=on_stream_closed)
print('Successfully subscribed to topic: ' + topic)
# Keep the main thread alive, or the process will exit.
try:
while True:
time.sleep(10)
except InterruptedError:
print('Subscribe interrupted.')
# To stop subscribing, close the stream.
operation.close()
except UnauthorizedError:
print('Unauthorized error while subscribing to topic: ' +
topic, file=sys.stderr)
traceback.print_exc()
exit(1)
except Exception:
print('Exception occurred', file=sys.stderr)
traceback.print_exc()
exit(1)
def on_stream_event(event: SubscriptionResponseMessage) -> None:
try:
message = str(event.binary_message.message, 'utf-8')
topic = event.binary_message.context.topic
print('Received new message on topic %s: %s' % (topic, message))
except:
traceback.print_exc()
def on_stream_error(error: Exception) -> bool:
print('Received a stream error.', file=sys.stderr)
traceback.print_exc()
return False # Return True to close stream, False to keep stream open.
def on_stream_closed() -> None:
print('Subscribe to topic stream closed.')
if __name__ == '__main__':
main()
- C++
-
例: ローカルのパブリッシュ/サブスクライブメッセージをサブスクライブする
#include <iostream>
#include </crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class SubscribeResponseHandler : public SubscribeToTopicStreamHandler {
public:
virtual ~SubscribeResponseHandler() {}
private:
void OnStreamEvent(SubscriptionResponseMessage *response) override {
auto jsonMessage = response->GetJsonMessage();
if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) {
auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable();
// Handle JSON message.
} else {
auto binaryMessage = response->GetBinaryMessage();
if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) {
auto messageBytes = binaryMessage.value().GetMessage().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
// Handle binary message.
}
}
}
bool OnStreamError(OperationError *error) override {
// Handle error.
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
// Handle close.
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
// Handle connection to IPC service.
}
void OnDisconnectCallback(RpcError error) override {
// Handle disconnection from IPC service.
}
bool OnErrorCallback(RpcError error) override {
// Handle IPC service connection error.
return true;
}
};
int main() {
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
String topic("my/topic");
int timeout = 10;
SubscribeToTopicRequest request;
request.SetTopic(topic);
//SubscribeResponseHandler streamHandler;
auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToTopic(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (!response) {
// Handle error.
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
(void)error;
// Handle operation error.
} else {
// Handle RPC error.
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}
- JavaScript
-
例: ローカルのパブリッシュ/サブスクライブメッセージをサブスクライブする
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc";
import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model";
import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc";
class SubscribeToTopic {
private ipcClient : greengrasscoreipc.Client
private readonly topic : string;
constructor() {
// define your own constructor, e.g.
this.topic = "<define_your_topic>";
this.subscribeToTopic().then(r => console.log("Started workflow"));
}
private async subscribeToTopic() {
try {
this.ipcClient = await getIpcClient();
const subscribeToTopicRequest : SubscribeToTopicRequest = {
topic: this.topic,
}
const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options
streamingOperation.on("message", (message: SubscriptionResponseMessage) => {
// parse the message depending on your use cases, e.g.
if(message.binaryMessage && message.binaryMessage.message) {
const receivedMessage = message.binaryMessage?.message.toString();
}
});
streamingOperation.on("streamError", (error : RpcError) => {
// define your own error handling logic
})
streamingOperation.on("ended", () => {
// define your own logic
})
await streamingOperation.activate();
// Keep the main thread alive, or the process will exit.
await new Promise((resolve) => setTimeout(resolve, 10000))
} catch (e) {
// parse the error depending on your use cases
throw e
}
}
}
export async function getIpcClient(){
try {
const ipcClient = greengrasscoreipc.createClient();
await ipcClient.connect()
.catch(error => {
// parse the error depending on your use cases
throw error;
});
return ipcClient
} catch (err) {
// parse the error depending on your use cases
throw err
}
}
// starting point
const subscribeToTopic = new SubscribeToTopic();
例
コンポーネントのパブリッシュ/サブスクライブ IPC サービスの使用方法については、以下の例を参照してください。
以下の recipe の例は、コンポーネントをすべてのトピックに発行できるようにします。
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.PubSubPublisherJava",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that publishes messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.PubSubPublisherJava:pubsub:1": {
"policyDescription": "Allows access to publish to all topics.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"run": "java -jar {artifacts:path}/PubSubPublisher.jar"
}
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.PubSubPublisherJava
ComponentVersion: '1.0.0'
ComponentDescription: A component that publishes messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
'com.example.PubSubPublisherJava:pubsub:1':
policyDescription: Allows access to publish to all topics.
operations:
- 'aws.greengrass#PublishToTopic'
resources:
- '*'
Manifests:
- Lifecycle:
run: |-
java -jar {artifacts:path}/PubSubPublisher.jar
以下の Java アプリケーションの例は、パブリッシュ/サブスクライブ IPC サービスを使用して、他コンポーネントにメッセージを発行する方法を示します。
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0 */
package com.example.ipc.pubsub;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
import software.amazon.awssdk.aws.greengrass.model.*;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class PubSubPublisher {
public static void main(String[] args) {
String message = "Hello from the pub/sub publisher (Java).";
String topic = "test/topic/java";
try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) {
GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection);
while (true) {
PublishToTopicRequest publishRequest = new PublishToTopicRequest();
PublishMessage publishMessage = new PublishMessage();
BinaryMessage binaryMessage = new BinaryMessage();
binaryMessage.setMessage(message.getBytes(StandardCharsets.UTF_8));
publishMessage.setBinaryMessage(binaryMessage);
publishRequest.setPublishMessage(publishMessage);
publishRequest.setTopic(topic);
CompletableFuture<PublishToTopicResponse> futureResponse = ipcClient
.publishToTopic(publishRequest, Optional.empty()).getResponse();
try {
futureResponse.get(10, TimeUnit.SECONDS);
System.out.println("Successfully published to topic: " + topic);
} catch (TimeoutException e) {
System.err.println("Timeout occurred while publishing to topic: " + topic);
} catch (ExecutionException e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Execution exception while publishing to topic: " + topic);
}
throw e;
}
Thread.sleep(5000);
}
} catch (InterruptedException e) {
System.out.println("Publisher interrupted.");
} catch (Exception e) {
System.err.println("Exception occurred when using IPC.");
e.printStackTrace();
System.exit(1);
}
}
}
以下の recipe の例は、コンポーネントをすべてのトピックをサブスクライブできるようにします。
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.PubSubSubscriberJava",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that subscribes to messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.PubSubSubscriberJava:pubsub:1": {
"policyDescription": "Allows access to subscribe to all topics.",
"operations": [
"aws.greengrass#SubscribeToTopic"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"run": "java -jar {artifacts:path}/PubSubSubscriber.jar"
}
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.PubSubSubscriberJava
ComponentVersion: '1.0.0'
ComponentDescription: A component that subscribes to messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
'com.example.PubSubSubscriberJava:pubsub:1':
policyDescription: Allows access to subscribe to all topics.
operations:
- 'aws.greengrass#SubscribeToTopic'
resources:
- '*'
Manifests:
- Lifecycle:
run: |-
java -jar {artifacts:path}/PubSubSubscriber.jar
以下の Java アプリケーションの例は、パブリッシュ/サブスクライブ IPC サービスを使用して、他コンポーネントのメッセージをサブスクライブする方法を示します。
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0 */
package com.example.ipc.pubsub;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicRequest;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicResponse;
import software.amazon.awssdk.aws.greengrass.model.SubscriptionResponseMessage;
import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class PubSubSubscriber {
public static void main(String[] args) {
String topic = "test/topic/java";
try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) {
GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection);
SubscribeToTopicRequest subscribeRequest = new SubscribeToTopicRequest();
subscribeRequest.setTopic(topic);
SubscribeToTopicResponseHandler operationResponseHandler = ipcClient
.subscribeToTopic(subscribeRequest, Optional.of(new SubscribeResponseHandler()));
CompletableFuture<SubscribeToTopicResponse> futureResponse = operationResponseHandler.getResponse();
try {
futureResponse.get(10, TimeUnit.SECONDS);
System.out.println("Successfully subscribed to topic: " + topic);
} catch (TimeoutException e) {
System.err.println("Timeout occurred while subscribing to topic: " + topic);
throw e;
} catch (ExecutionException e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while subscribing to topic: " + topic);
} else {
System.err.println("Execution exception while subscribing to topic: " + topic);
}
throw e;
}
// Keep the main thread alive, or the process will exit.
try {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
}
} catch (Exception e) {
System.err.println("Exception occurred when using IPC.");
e.printStackTrace();
System.exit(1);
}
}
private static class SubscribeResponseHandler implements StreamResponseHandler<SubscriptionResponseMessage> {
@Override
public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) {
try {
String message = new String(subscriptionResponseMessage.getBinaryMessage()
.getMessage(), StandardCharsets.UTF_8);
System.out.println("Received new message: " + message);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public boolean onStreamError(Throwable error) {
System.err.println("Received a stream error.");
error.printStackTrace();
return false; // Return true to close stream, false to keep stream open.
}
@Override
public void onStreamClosed() {
System.out.println("Subscribe to topic stream closed.");
}
}
}
以下の recipe の例は、コンポーネントをすべてのトピックに発行できるようにします。
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.PubSubPublisherPython",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that publishes messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.PubSubPublisherPython:pubsub:1": {
"policyDescription": "Allows access to publish to all topics.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Platform": {
"os": "linux"
},
"Lifecycle": {
"install": "python3 -m pip install --user awsiotsdk",
"run": "python3 -u {artifacts:path}/pubsub_publisher.py"
}
},
{
"Platform": {
"os": "windows"
},
"Lifecycle": {
"install": "py -3 -m pip install --user awsiotsdk",
"run": "py -3 -u {artifacts:path}/pubsub_publisher.py"
}
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.PubSubPublisherPython
ComponentVersion: 1.0.0
ComponentDescription: A component that publishes messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
com.example.PubSubPublisherPython:pubsub:1:
policyDescription: Allows access to publish to all topics.
operations:
- aws.greengrass#PublishToTopic
resources:
- "*"
Manifests:
- Platform:
os: linux
Lifecycle:
install: python3 -m pip install --user awsiotsdk
run: python3 -u {artifacts:path}/pubsub_publisher.py
- Platform:
os: windows
Lifecycle:
install: py -3 -m pip install --user awsiotsdk
run: py -3 -u {artifacts:path}/pubsub_publisher.py
以下の Python アプリケーションの例は、パブリッシュ/サブスクライブ IPC サービスを使用して、他コンポーネントにメッセージを発行する方法を示します。
import concurrent.futures
import sys
import time
import traceback
import awsiot.greengrasscoreipc
from awsiot.greengrasscoreipc.model import (
PublishToTopicRequest,
PublishMessage,
BinaryMessage,
UnauthorizedError
)
topic = "test/topic/python"
message = "Hello from the pub/sub publisher (Python)."
TIMEOUT = 10
try:
ipc_client = awsiot.greengrasscoreipc.connect()
while True:
request = PublishToTopicRequest()
request.topic = topic
publish_message = PublishMessage()
publish_message.binary_message = BinaryMessage()
publish_message.binary_message.message = bytes(message, "utf-8")
request.publish_message = publish_message
operation = ipc_client.new_publish_to_topic()
operation.activate(request)
future_response = operation.get_response()
try:
future_response.result(TIMEOUT)
print('Successfully published to topic: ' + topic)
except concurrent.futures.TimeoutError:
print('Timeout occurred while publishing to topic: ' + topic, file=sys.stderr)
except UnauthorizedError as e:
print('Unauthorized error while publishing to topic: ' + topic, file=sys.stderr)
raise e
except Exception as e:
print('Exception while publishing to topic: ' + topic, file=sys.stderr)
raise e
time.sleep(5)
except InterruptedError:
print('Publisher interrupted.')
except Exception:
print('Exception occurred when using IPC.', file=sys.stderr)
traceback.print_exc()
exit(1)
以下の recipe の例は、コンポーネントをすべてのトピックをサブスクライブできるようにします。
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.PubSubSubscriberPython",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that subscribes to messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.PubSubSubscriberPython:pubsub:1": {
"policyDescription": "Allows access to subscribe to all topics.",
"operations": [
"aws.greengrass#SubscribeToTopic"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Platform": {
"os": "linux"
},
"Lifecycle": {
"install": "python3 -m pip install --user awsiotsdk",
"run": "python3 -u {artifacts:path}/pubsub_subscriber.py"
}
},
{
"Platform": {
"os": "windows"
},
"Lifecycle": {
"install": "py -3 -m pip install --user awsiotsdk",
"run": "py -3 -u {artifacts:path}/pubsub_subscriber.py"
}
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.PubSubSubscriberPython
ComponentVersion: 1.0.0
ComponentDescription: A component that subscribes to messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
com.example.PubSubSubscriberPython:pubsub:1:
policyDescription: Allows access to subscribe to all topics.
operations:
- aws.greengrass#SubscribeToTopic
resources:
- "*"
Manifests:
- Platform:
os: linux
Lifecycle:
install: python3 -m pip install --user awsiotsdk
run: python3 -u {artifacts:path}/pubsub_subscriber.py
- Platform:
os: windows
Lifecycle:
install: py -3 -m pip install --user awsiotsdk
run: py -3 -u {artifacts:path}/pubsub_subscriber.py
以下の Python アプリケーションの例は、パブリッシュ/サブスクライブ IPC サービスを使用して、他コンポーネントのメッセージをサブスクライブする方法を示します。
import concurrent.futures
import sys
import time
import traceback
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
SubscribeToTopicRequest,
SubscriptionResponseMessage,
UnauthorizedError
)
topic = "test/topic/python"
TIMEOUT = 10
class StreamHandler(client.SubscribeToTopicStreamHandler):
def __init__(self):
super().__init__()
def on_stream_event(self, event: SubscriptionResponseMessage) -> None:
try:
message = str(event.binary_message.message, "utf-8")
print("Received new message: " + message)
except:
traceback.print_exc()
def on_stream_error(self, error: Exception) -> bool:
print("Received a stream error.", file=sys.stderr)
traceback.print_exc()
return False # Return True to close stream, False to keep stream open.
def on_stream_closed(self) -> None:
print('Subscribe to topic stream closed.')
try:
ipc_client = awsiot.greengrasscoreipc.connect()
request = SubscribeToTopicRequest()
request.topic = topic
handler = StreamHandler()
operation = ipc_client.new_subscribe_to_topic(handler)
operation.activate(request)
future_response = operation.get_response()
try:
future_response.result(TIMEOUT)
print('Successfully subscribed to topic: ' + topic)
except concurrent.futures.TimeoutError as e:
print('Timeout occurred while subscribing to topic: ' + topic, file=sys.stderr)
raise e
except UnauthorizedError as e:
print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr)
raise e
except Exception as e:
print('Exception while subscribing to topic: ' + topic, file=sys.stderr)
raise e
# Keep the main thread alive, or the process will exit.
try:
while True:
time.sleep(10)
except InterruptedError:
print('Subscribe interrupted.')
except Exception:
print('Exception occurred when using IPC.', file=sys.stderr)
traceback.print_exc()
exit(1)
以下の recipe の例は、コンポーネントをすべてのトピックに発行できるようにします。
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.PubSubPublisherCpp",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that publishes messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.PubSubPublisherCpp:pubsub:1": {
"policyDescription": "Allows access to publish to all topics.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"run": "{artifacts:path}/greengrassv2_pubsub_publisher"
},
"Artifacts": [
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubPublisherCpp/1.0.0/greengrassv2_pubsub_publisher",
"Permission": {
"Execute": "OWNER"
}
}
]
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.PubSubPublisherCpp
ComponentVersion: 1.0.0
ComponentDescription: A component that publishes messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
com.example.PubSubPublisherCpp:pubsub:1:
policyDescription: Allows access to publish to all topics.
operations:
- aws.greengrass#PublishToTopic
resources:
- "*"
Manifests:
- Lifecycle:
run: "{artifacts:path}/greengrassv2_pubsub_publisher"
Artifacts:
- URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubPublisherCpp/1.0.0/greengrassv2_pubsub_publisher
Permission:
Execute: OWNER
以下の C++ アプリケーションの例は、パブリッシュ/サブスクライブ IPC サービスを使用して、他コンポーネントにメッセージを発行する方法を示します。
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String message("Hello from the pub/sub publisher (C++).");
String topic("test/topic/cpp");
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
while (true) {
PublishToTopicRequest request;
Vector<uint8_t> messageData({message.begin(), message.end()});
BinaryMessage binaryMessage;
binaryMessage.SetMessage(messageData);
PublishMessage publishMessage;
publishMessage.SetBinaryMessage(binaryMessage);
request.SetTopic(topic);
request.SetPublishMessage(publishMessage);
auto operation = ipcClient.NewPublishToTopic();
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully published to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to publish to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
std::this_thread::sleep_for(std::chrono::seconds(5));
}
return 0;
}
以下の recipe の例は、コンポーネントをすべてのトピックをサブスクライブできるようにします。
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.PubSubSubscriberCpp",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that subscribes to messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.PubSubSubscriberCpp:pubsub:1": {
"policyDescription": "Allows access to subscribe to all topics.",
"operations": [
"aws.greengrass#SubscribeToTopic"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"run": "{artifacts:path}/greengrassv2_pub_sub_subscriber"
},
"Artifacts": [
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pub_sub_subscriber",
"Permission": {
"Execute": "OWNER"
}
}
]
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.PubSubSubscriberCpp
ComponentVersion: 1.0.0
ComponentDescription: A component that subscribes to messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
com.example.PubSubSubscriberCpp:pubsub:1:
policyDescription: Allows access to subscribe to all topics.
operations:
- aws.greengrass#SubscribeToTopic
resources:
- "*"
Manifests:
- Lifecycle:
run: "{artifacts:path}/greengrassv2_pub_sub_subscriber"
Artifacts:
- URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pub_sub_subscriber
Permission:
Execute: OWNER
以下の C++ アプリケーションの例は、パブリッシュ/サブスクライブ IPC サービスを使用して、他コンポーネントのメッセージをサブスクライブする方法を示します。
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class SubscribeResponseHandler : public SubscribeToTopicStreamHandler {
public:
virtual ~SubscribeResponseHandler() {}
private:
void OnStreamEvent(SubscriptionResponseMessage *response) override {
auto jsonMessage = response->GetJsonMessage();
if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) {
auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable();
std::cout << "Received new message: " << messageString << std::endl;
} else {
auto binaryMessage = response->GetBinaryMessage();
if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) {
auto messageBytes = binaryMessage.value().GetMessage().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
std::cout << "Received new message: " << messageString << std::endl;
}
}
}
bool OnStreamError(OperationError *error) override {
std::cout << "Received an operation error: ";
if (error->GetMessage().has_value()) {
std::cout << error->GetMessage().value();
}
std::cout << std::endl;
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
std::cout << "Subscribe to topic stream closed." << std::endl;
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String topic("test/topic/cpp");
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
SubscribeToTopicRequest request;
request.SetTopic(topic);
auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToTopic(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully subscribed to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to subscribe to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}