기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
로컬 메시지 게시/구독
게시/구독(pubsub) 메시징을 사용하면 주제에 메시지를 보내고 주제에서 메시지를 받을 수 있습니다. 구성 요소가 주제에 메시지를 게시하여 다른 구성 요소에 메시지를 보낼 수 있습니다. 그러면 해당 주제를 구독하는 구성 요소가 수신하는 메시지에 대한 작업을 수행할 수 있습니다.
이 게시/구독 IPC 서비스를 사용하여를 게시하거나 구독할 AWS IoT Core 수 없습니다MQTT. 와 메시지를 교환하는 방법에 대한 자세한 내용은 섹션을 AWS IoT Core MQTT참조하세요메시지 게시/구독 AWS IoT Core MQTT .
최소 SDK 버전
다음 표에는 로컬 주제에 메시지를 게시하고 구독하는 데 사용해야 AWS IoT Device SDK 하는 최소 버전의가 나열되어 있습니다.
권한 부여
사용자 지정 구성 요소에서 로컬 게시/구독 메시징을 사용하려면 구성 요소가 주제에 대한 메시지를 보내고 받을 수 있도록 허용하는 권한 부여 정책을 정의해야 합니다. 권한 부여 정책 정의에 대한 자세한 내용은 IPC 작업을 수행하도록 구성 요소 승인 섹션을 참조하세요.
게시/구독 메시징에 대한 권한 부여 정책에는 다음 속성이 있습니다.
IPC 서비스 식별자: aws.greengrass.ipc.pubsub
Operation |
설명 |
리소스 |
aws.greengrass#PublishToTopic
|
구성 요소가 지정한 주제에 메시지를 게시할 수 있도록 허용합니다.
|
test/topic 과 같은 주제 문자열입니다. 주제의 임의의 문자 조합과 일치시키려면 * 를 사용합니다.
이 주제 문자열은 MQTT 주제 와일드카드(# 및 )를 지원하지 않습니다+ .
|
aws.greengrass#SubscribeToTopic
|
구성 요소가 지정하는 주제에 대한 메시지를 구독할 수 있도록 허용합니다.
|
test/topic 과 같은 주제 문자열입니다. 주제의 임의의 문자 조합과 일치시키려면 * 를 사용합니다.
Greengrass nucleus v2.6.0 이상에서는 주제 와일드카드(# 및 + )가 포함된 MQTT 주제를 구독할 수 있습니다. 이 주제 문자열은 MQTT 주제 와일드카드를 리터럴 문자로 지원합니다. 예를 들어 구성 요소의 권한 부여 정책에서 test/topic/# 에 대한 액세스 권한을 부여하는 경우 구성 요소는 test/topic/# 는 구독할 수 있지만 test/topic/filter 는 구독할 수 없습니다.
|
*
|
구성 요소가 지정하는 주제에 대한 메시지를 게시하고 구독할 수 있도록 허용합니다.
|
test/topic 과 같은 주제 문자열입니다. 주제의 임의의 문자 조합과 일치시키려면 * 를 사용합니다.
Greengrass nucleus v2.6.0 이상에서는 주제 와일드카드(# 및 + )가 포함된 MQTT 주제를 구독할 수 있습니다. 이 주제 문자열은 MQTT 주제 와일드카드를 리터럴 문자로 지원합니다. 예를 들어 구성 요소의 권한 부여 정책에서 test/topic/# 에 대한 액세스 권한을 부여하는 경우 구성 요소는 test/topic/# 는 구독할 수 있지만 test/topic/filter 는 구독할 수 없습니다.
|
권한 부여 정책 예제
다음 권한 부여 정책 예제를 참조하면 구성 요소의 권한 부여 정책을 구성하는 데 도움이 됩니다.
예 권한 부여 정책 예제
다음 권한 부여 정책 예제에서는 구성 요소가 모든 주제를 게시하고 구독할 수 있도록 허용합니다.
{
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.MyLocalPubSubComponent
:pubsub:1": {
"policyDescription": "Allows access to publish/subscribe to all topics.",
"operations": [
"aws.greengrass#PublishToTopic",
"aws.greengrass#SubscribeToTopic"
],
"resources": [
"*"
]
}
}
}
}
PublishToTopic
주제에 메시지를 게시합니다.
요청
이 작업의 요청에서는 다음 파라미터를 사용합니다.
topic
-
메시지를 게시할 주제입니다.
publishMessage
(Python: publish_message
)
-
게시할 메시지입니다. 이 객체 PublishMessage
에는 다음 정보가 포함됩니다. jsonMessage
및 binaryMessage
중 하나를 지정해야 합니다.
jsonMessage
(Python: json_message
)
-
(선택 사항) JSON 메시지입니다. 이 객체 JsonMessage
에는 다음 정보가 포함됩니다.
message
-
JSON 메시지를 객체로 표시합니다.
context
-
메시지가 게시된 주제와 같은 메시지의 컨텍스트입니다.
이 기능은 Greengrass nucleus 구성 요소의 v2.6.0 이상에서 사용할 수 있습니다. 다음 표에는 메시지 컨텍스트에 액세스하기 위해 사용해야 하는 AWS IoT Device SDK 의 최소 버전이 나열되어 있습니다.
AWS IoT Greengrass 코어 소프트웨어는 PublishToTopic
및 SubscribeToTopic
작업에서 동일한 메시지 객체를 사용합니다. AWS IoT Greengrass Core 소프트웨어는 구독 시 메시지에서이 컨텍스트 객체를 설정하고 게시하는 메시지에서이 컨텍스트 객체를 무시합니다.
이 객체 MessageContext
에는 다음 정보가 포함됩니다.
binaryMessage
(Python: binary_message
)
-
(선택 사항) 이진 메시지입니다. 이 객체 BinaryMessage
에는 다음 정보가 포함됩니다.
message
-
Blob인 이진 메시지입니다.
context
-
메시지가 게시된 주제와 같은 메시지의 컨텍스트입니다.
이 기능은 Greengrass nucleus 구성 요소의 v2.6.0 이상에서 사용할 수 있습니다. 다음 표에는 메시지 컨텍스트에 액세스하기 위해 사용해야 하는 AWS IoT Device SDK 의 최소 버전이 나열되어 있습니다.
AWS IoT Greengrass 코어 소프트웨어는 PublishToTopic
및 SubscribeToTopic
작업에서 동일한 메시지 객체를 사용합니다. AWS IoT Greengrass Core 소프트웨어는 구독 시 메시지에서이 컨텍스트 객체를 설정하고 게시하는 메시지에서이 컨텍스트 객체를 무시합니다.
이 객체 MessageContext
에는 다음 정보가 포함됩니다.
응답
이 작업의 응답에는 어떠한 정보도 제공하지 않습니다.
예시
다음 예제에서는 사용자 지정 구성 요소 코드에서 이 작업을 직접 호출하는 방법을 보여줍니다.
- Java (IPC client V2)
-
예: 이진 메시지 게시
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.model.BinaryMessage;
import software.amazon.awssdk.aws.greengrass.model.PublishMessage;
import software.amazon.awssdk.aws.greengrass.model.PublishToTopicRequest;
import software.amazon.awssdk.aws.greengrass.model.PublishToTopicResponse;
import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError;
import java.nio.charset.StandardCharsets;
public class PublishToTopicV2 {
public static void main(String[] args) {
String topic = args[0];
String message = args[1];
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
PublishToTopicV2.publishBinaryMessageToTopic(ipcClient, topic, message);
System.out.println("Successfully published to topic: " + topic);
} catch (Exception e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Exception occurred when using IPC.");
}
e.printStackTrace();
System.exit(1);
}
}
public static PublishToTopicResponse publishBinaryMessageToTopic(
GreengrassCoreIPCClientV2 ipcClient, String topic, String message) throws InterruptedException {
BinaryMessage binaryMessage =
new BinaryMessage().withMessage(message.getBytes(StandardCharsets.UTF_8));
PublishMessage publishMessage = new PublishMessage().withBinaryMessage(binaryMessage);
PublishToTopicRequest publishToTopicRequest =
new PublishToTopicRequest().withTopic(topic).withPublishMessage(publishMessage);
return ipcClient.publishToTopic(publishToTopicRequest);
}
}
- Python (IPC client V2)
-
예: 이진 메시지 게시
import sys
import traceback
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import (
PublishMessage,
BinaryMessage
)
def main():
args = sys.argv[1:]
topic = args[0]
message = args[1]
try:
ipc_client = GreengrassCoreIPCClientV2()
publish_binary_message_to_topic(ipc_client, topic, message)
print('Successfully published to topic: ' + topic)
except Exception:
print('Exception occurred', file=sys.stderr)
traceback.print_exc()
exit(1)
def publish_binary_message_to_topic(ipc_client, topic, message):
binary_message = BinaryMessage(message=bytes(message, 'utf-8'))
publish_message = PublishMessage(binary_message=binary_message)
return ipc_client.publish_to_topic(topic=topic, publish_message=publish_message)
if __name__ == '__main__':
main()
- C++
-
예: 이진 메시지 게시
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
// Handle connection to IPC service.
}
void OnDisconnectCallback(RpcError error) override {
// Handle disconnection from IPC service.
}
bool OnErrorCallback(RpcError error) override {
// Handle IPC service connection error.
return true;
}
};
int main() {
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
String topic("my/topic");
String message("Hello, World!");
int timeout = 10;
PublishToTopicRequest request;
Vector<uint8_t> messageData({message.begin(), message.end()});
BinaryMessage binaryMessage;
binaryMessage.SetMessage(messageData);
PublishMessage publishMessage;
publishMessage.SetBinaryMessage(binaryMessage);
request.SetTopic(topic);
request.SetPublishMessage(publishMessage);
auto operation = ipcClient.NewPublishToTopic();
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (!response) {
// Handle error.
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
(void)error;
// Handle operation error.
} else {
// Handle RPC error.
}
}
return 0;
}
- JavaScript
-
예: 이진 메시지 게시
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc";
import {BinaryMessage, PublishMessage, PublishToTopicRequest} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model";
class PublishToTopic {
private ipcClient : greengrasscoreipc.Client
private readonly topic : string;
private readonly messageString : string;
constructor() {
// define your own constructor, e.g.
this.topic = "<define_your_topic>";
this.messageString = "<define_your_message_string>";
this.publishToTopic().then(r => console.log("Started workflow"));
}
private async publishToTopic() {
try {
this.ipcClient = await getIpcClient();
const binaryMessage : BinaryMessage = {
message: this.messageString
}
const publishMessage : PublishMessage = {
binaryMessage: binaryMessage
}
const request : PublishToTopicRequest = {
topic: this.topic,
publishMessage: publishMessage
}
this.ipcClient.publishToTopic(request).finally(() => console.log(`Published message ${publishMessage.binaryMessage?.message} to topic`))
} catch (e) {
// parse the error depending on your use cases
throw e
}
}
}
export async function getIpcClient(){
try {
const ipcClient = greengrasscoreipc.createClient();
await ipcClient.connect()
.catch(error => {
// parse the error depending on your use cases
throw error;
});
return ipcClient
} catch (err) {
// parse the error depending on your use cases
throw err
}
}
// starting point
const publishToTopic = new PublishToTopic();
SubscribeToTopic
주제에 대한 메시지를 구독합니다.
이 작업은 이벤트 메시지 스트림을 구독하는 구독 작업입니다. 이 작업을 사용하려면 이벤트 메시지, 오류 및 스트림 종료를 처리하는 함수를 사용하여 스트림 응답 핸들러를 정의합니다. 자세한 내용은 IPC 이벤트 스트림 구독 단원을 참조하십시오.
이벤트 메시지 유형: SubscriptionResponseMessage
요청
이 작업의 요청에서는 다음 파라미터를 사용합니다.
topic
-
구독할 주제입니다.
receiveMode
(Python: receive_mode
)
-
(선택 사항) 구성 요소가 자체에서 메시지를 수신하는지 여부를 지정하는 동작입니다. 이 동작을 변경하여 구성 요소가 자체 메시지에 대해 조치하도록 허용할 수 있습니다. 기본 동작은 주제에 MQTT 와일드카드가 포함되어 있는지 여부에 따라 달라집니다. 다음 옵션 중 하나를 선택합니다.
-
RECEIVE_ALL_MESSAGES
- 구독하는 구성 요소의 메시지를 포함하여 주제와 일치하는 모든 메시지를 수신합니다.
이 모드는 MQTT 와일드카드가 포함되지 않은 주제를 구독할 때 기본 옵션입니다.
-
RECEIVE_MESSAGES_FROM_OTHERS
- 구독하는 구성 요소의 메시지를 제외하고 주제와 일치하는 모든 메시지를 수신합니다.
MQTT 와일드카드가 포함된 주제를 구독하는 경우이 모드가 기본 옵션입니다.
이 기능은 Greengrass nucleus 구성 요소의 v2.6.0 이상에서 사용할 수 있습니다. 다음 표에는 수신 모드를 설정하는 데 사용해야 AWS IoT Device SDK 하는의 최소 버전이 나열되어 있습니다.
응답
이 작업의 응답에는 다음 정보가 포함됩니다.
messages
-
메시지 스트림입니다. 이 객체 SubscriptionResponseMessage
에는 다음 정보가 포함됩니다. 각 메시지에는 jsonMessage
또는 binaryMessage
가 포함됩니다.
jsonMessage
(Python: json_message
)
-
(선택 사항) JSON 메시지입니다. 이 객체 JsonMessage
에는 다음 정보가 포함됩니다.
message
-
JSON 메시지를 객체로 표시합니다.
context
-
메시지가 게시된 주제와 같은 메시지의 컨텍스트입니다.
이 기능은 Greengrass nucleus 구성 요소의 v2.6.0 이상에서 사용할 수 있습니다. 다음 표에는 메시지 컨텍스트에 액세스하기 위해 사용해야 하는 AWS IoT Device SDK 의 최소 버전이 나열되어 있습니다.
AWS IoT Greengrass 코어 소프트웨어는 PublishToTopic
및 SubscribeToTopic
작업에서 동일한 메시지 객체를 사용합니다. AWS IoT Greengrass Core 소프트웨어는 구독 시 메시지에서이 컨텍스트 객체를 설정하고 게시하는 메시지에서이 컨텍스트 객체를 무시합니다.
이 객체 MessageContext
에는 다음 정보가 포함됩니다.
binaryMessage
(Python: binary_message
)
-
(선택 사항) 이진 메시지입니다. 이 객체 BinaryMessage
에는 다음 정보가 포함됩니다.
message
-
Blob인 이진 메시지입니다.
context
-
메시지가 게시된 주제와 같은 메시지의 컨텍스트입니다.
이 기능은 Greengrass nucleus 구성 요소의 v2.6.0 이상에서 사용할 수 있습니다. 다음 표에는 메시지 컨텍스트에 액세스하기 위해 사용해야 하는 AWS IoT Device SDK 의 최소 버전이 나열되어 있습니다.
AWS IoT Greengrass 코어 소프트웨어는 PublishToTopic
및 SubscribeToTopic
작업에서 동일한 메시지 객체를 사용합니다. AWS IoT Greengrass Core 소프트웨어는 구독 시 메시지에서이 컨텍스트 객체를 설정하고 게시하는 메시지에서이 컨텍스트 객체를 무시합니다.
이 객체 MessageContext
에는 다음 정보가 포함됩니다.
topicName
(Python: topic_name
)
-
메시지가 게시된 주제입니다.
이 속성은 현재 사용되지 않습니다. Greengrass nucleus v2.6.0 이상에서는 SubscriptionResponseMessage
에서 (jsonMessage|binaryMessage).context.topic
값을 가져와 메시지가 게시된 주제를 가져올 수 있습니다.
예시
다음 예제에서는 사용자 지정 구성 요소 코드에서 이 작업을 직접 호출하는 방법을 보여줍니다.
- Java (IPC client V2)
-
예: 로컬 게시/구독 메시지 구독
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.*;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
public class SubscribeToTopicV2 {
public static void main(String[] args) {
String topic = args[0];
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic);
GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse,
SubscribeToTopicResponseHandler> response =
ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent,
Optional.of(SubscribeToTopicV2::onStreamError),
Optional.of(SubscribeToTopicV2::onStreamClosed));
SubscribeToTopicResponseHandler responseHandler = response.getHandler();
System.out.println("Successfully subscribed to topic: " + topic);
// Keep the main thread alive, or the process will exit.
try {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
}
// To stop subscribing, close the stream.
responseHandler.closeStream();
} catch (Exception e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Exception occurred when using IPC.");
}
e.printStackTrace();
System.exit(1);
}
}
public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) {
try {
BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage();
String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8);
String topic = binaryMessage.getContext().getTopic();
System.out.printf("Received new message on topic %s: %s%n", topic, message);
} catch (Exception e) {
System.err.println("Exception occurred while processing subscription response " +
"message.");
e.printStackTrace();
}
}
public static boolean onStreamError(Throwable error) {
System.err.println("Received a stream error.");
error.printStackTrace();
return false; // Return true to close stream, false to keep stream open.
}
public static void onStreamClosed() {
System.out.println("Subscribe to topic stream closed.");
}
}
- Python (IPC client V2)
-
예: 로컬 게시/구독 메시지 구독
import sys
import time
import traceback
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import (
SubscriptionResponseMessage,
UnauthorizedError
)
def main():
args = sys.argv[1:]
topic = args[0]
try:
ipc_client = GreengrassCoreIPCClientV2()
# Subscription operations return a tuple with the response and the operation.
_, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event,
on_stream_error=on_stream_error, on_stream_closed=on_stream_closed)
print('Successfully subscribed to topic: ' + topic)
# Keep the main thread alive, or the process will exit.
try:
while True:
time.sleep(10)
except InterruptedError:
print('Subscribe interrupted.')
# To stop subscribing, close the stream.
operation.close()
except UnauthorizedError:
print('Unauthorized error while subscribing to topic: ' +
topic, file=sys.stderr)
traceback.print_exc()
exit(1)
except Exception:
print('Exception occurred', file=sys.stderr)
traceback.print_exc()
exit(1)
def on_stream_event(event: SubscriptionResponseMessage) -> None:
try:
message = str(event.binary_message.message, 'utf-8')
topic = event.binary_message.context.topic
print('Received new message on topic %s: %s' % (topic, message))
except:
traceback.print_exc()
def on_stream_error(error: Exception) -> bool:
print('Received a stream error.', file=sys.stderr)
traceback.print_exc()
return False # Return True to close stream, False to keep stream open.
def on_stream_closed() -> None:
print('Subscribe to topic stream closed.')
if __name__ == '__main__':
main()
- C++
-
예: 로컬 게시/구독 메시지 구독
#include <iostream>
#include </crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class SubscribeResponseHandler : public SubscribeToTopicStreamHandler {
public:
virtual ~SubscribeResponseHandler() {}
private:
void OnStreamEvent(SubscriptionResponseMessage *response) override {
auto jsonMessage = response->GetJsonMessage();
if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) {
auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable();
// Handle JSON message.
} else {
auto binaryMessage = response->GetBinaryMessage();
if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) {
auto messageBytes = binaryMessage.value().GetMessage().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
// Handle binary message.
}
}
}
bool OnStreamError(OperationError *error) override {
// Handle error.
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
// Handle close.
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
// Handle connection to IPC service.
}
void OnDisconnectCallback(RpcError error) override {
// Handle disconnection from IPC service.
}
bool OnErrorCallback(RpcError error) override {
// Handle IPC service connection error.
return true;
}
};
int main() {
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
String topic("my/topic");
int timeout = 10;
SubscribeToTopicRequest request;
request.SetTopic(topic);
//SubscribeResponseHandler streamHandler;
auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToTopic(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (!response) {
// Handle error.
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
(void)error;
// Handle operation error.
} else {
// Handle RPC error.
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}
- JavaScript
-
예: 로컬 게시/구독 메시지 구독
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc";
import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model";
import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc";
class SubscribeToTopic {
private ipcClient : greengrasscoreipc.Client
private readonly topic : string;
constructor() {
// define your own constructor, e.g.
this.topic = "<define_your_topic>";
this.subscribeToTopic().then(r => console.log("Started workflow"));
}
private async subscribeToTopic() {
try {
this.ipcClient = await getIpcClient();
const subscribeToTopicRequest : SubscribeToTopicRequest = {
topic: this.topic,
}
const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options
streamingOperation.on("message", (message: SubscriptionResponseMessage) => {
// parse the message depending on your use cases, e.g.
if(message.binaryMessage && message.binaryMessage.message) {
const receivedMessage = message.binaryMessage?.message.toString();
}
});
streamingOperation.on("streamError", (error : RpcError) => {
// define your own error handling logic
})
streamingOperation.on("ended", () => {
// define your own logic
})
await streamingOperation.activate();
// Keep the main thread alive, or the process will exit.
await new Promise((resolve) => setTimeout(resolve, 10000))
} catch (e) {
// parse the error depending on your use cases
throw e
}
}
}
export async function getIpcClient(){
try {
const ipcClient = greengrasscoreipc.createClient();
await ipcClient.connect()
.catch(error => {
// parse the error depending on your use cases
throw error;
});
return ipcClient
} catch (err) {
// parse the error depending on your use cases
throw err
}
}
// starting point
const subscribeToTopic = new SubscribeToTopic();
예시
다음 예제를 사용하여 구성 요소에서 게시/구독 IPC 서비스를 사용하는 방법을 알아봅니다.
다음 예제 레시피에서는 구성 요소가 모든 주제에 게시할 수 있도록 허용합니다.
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.PubSubPublisherJava",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that publishes messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.PubSubPublisherJava:pubsub:1": {
"policyDescription": "Allows access to publish to all topics.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"Run": "java -jar {artifacts:path}/PubSubPublisher.jar"
}
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.PubSubPublisherJava
ComponentVersion: '1.0.0'
ComponentDescription: A component that publishes messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
'com.example.PubSubPublisherJava:pubsub:1':
policyDescription: Allows access to publish to all topics.
operations:
- 'aws.greengrass#PublishToTopic'
resources:
- '*'
Manifests:
- Lifecycle:
Run: |-
java -jar {artifacts:path}/PubSubPublisher.jar
다음 예제 Java 애플리케이션은 게시/구독 IPC 서비스를 사용하여 메시지를 다른 구성 요소에 게시하는 방법을 보여줍니다.
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0 */
package com.example.ipc.pubsub;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
import software.amazon.awssdk.aws.greengrass.model.*;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class PubSubPublisher {
public static void main(String[] args) {
String message = "Hello from the pub/sub publisher (Java).";
String topic = "test/topic/java";
try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) {
GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection);
while (true) {
PublishToTopicRequest publishRequest = new PublishToTopicRequest();
PublishMessage publishMessage = new PublishMessage();
BinaryMessage binaryMessage = new BinaryMessage();
binaryMessage.setMessage(message.getBytes(StandardCharsets.UTF_8));
publishMessage.setBinaryMessage(binaryMessage);
publishRequest.setPublishMessage(publishMessage);
publishRequest.setTopic(topic);
CompletableFuture<PublishToTopicResponse> futureResponse = ipcClient
.publishToTopic(publishRequest, Optional.empty()).getResponse();
try {
futureResponse.get(10, TimeUnit.SECONDS);
System.out.println("Successfully published to topic: " + topic);
} catch (TimeoutException e) {
System.err.println("Timeout occurred while publishing to topic: " + topic);
} catch (ExecutionException e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Execution exception while publishing to topic: " + topic);
}
throw e;
}
Thread.sleep(5000);
}
} catch (InterruptedException e) {
System.out.println("Publisher interrupted.");
} catch (Exception e) {
System.err.println("Exception occurred when using IPC.");
e.printStackTrace();
System.exit(1);
}
}
}
다음 예제 레시피에서는 구성 요소가 모든 주제를 구독하도록 허용합니다.
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.PubSubSubscriberJava",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that subscribes to messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.PubSubSubscriberJava:pubsub:1": {
"policyDescription": "Allows access to subscribe to all topics.",
"operations": [
"aws.greengrass#SubscribeToTopic"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"Run": "java -jar {artifacts:path}/PubSubSubscriber.jar"
}
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.PubSubSubscriberJava
ComponentVersion: '1.0.0'
ComponentDescription: A component that subscribes to messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
'com.example.PubSubSubscriberJava:pubsub:1':
policyDescription: Allows access to subscribe to all topics.
operations:
- 'aws.greengrass#SubscribeToTopic'
resources:
- '*'
Manifests:
- Lifecycle:
Run: |-
java -jar {artifacts:path}/PubSubSubscriber.jar
다음 예제 Java 애플리케이션은 게시/구독 IPC 서비스를 사용하여 다른 구성 요소에 대한 메시지를 구독하는 방법을 보여줍니다.
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0 */
package com.example.ipc.pubsub;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicRequest;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicResponse;
import software.amazon.awssdk.aws.greengrass.model.SubscriptionResponseMessage;
import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class PubSubSubscriber {
public static void main(String[] args) {
String topic = "test/topic/java";
try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) {
GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection);
SubscribeToTopicRequest subscribeRequest = new SubscribeToTopicRequest();
subscribeRequest.setTopic(topic);
SubscribeToTopicResponseHandler operationResponseHandler = ipcClient
.subscribeToTopic(subscribeRequest, Optional.of(new SubscribeResponseHandler()));
CompletableFuture<SubscribeToTopicResponse> futureResponse = operationResponseHandler.getResponse();
try {
futureResponse.get(10, TimeUnit.SECONDS);
System.out.println("Successfully subscribed to topic: " + topic);
} catch (TimeoutException e) {
System.err.println("Timeout occurred while subscribing to topic: " + topic);
throw e;
} catch (ExecutionException e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while subscribing to topic: " + topic);
} else {
System.err.println("Execution exception while subscribing to topic: " + topic);
}
throw e;
}
// Keep the main thread alive, or the process will exit.
try {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
}
} catch (Exception e) {
System.err.println("Exception occurred when using IPC.");
e.printStackTrace();
System.exit(1);
}
}
private static class SubscribeResponseHandler implements StreamResponseHandler<SubscriptionResponseMessage> {
@Override
public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) {
try {
String message = new String(subscriptionResponseMessage.getBinaryMessage()
.getMessage(), StandardCharsets.UTF_8);
System.out.println("Received new message: " + message);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public boolean onStreamError(Throwable error) {
System.err.println("Received a stream error.");
error.printStackTrace();
return false; // Return true to close stream, false to keep stream open.
}
@Override
public void onStreamClosed() {
System.out.println("Subscribe to topic stream closed.");
}
}
}
다음 예제 레시피에서는 구성 요소가 모든 주제에 게시할 수 있도록 허용합니다.
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.PubSubPublisherPython",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that publishes messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.PubSubPublisherPython:pubsub:1": {
"policyDescription": "Allows access to publish to all topics.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Platform": {
"os": "linux"
},
"Lifecycle": {
"install": "python3 -m pip install --user awsiotsdk",
"Run": "python3 -u {artifacts:path}/pubsub_publisher.py"
}
},
{
"Platform": {
"os": "windows"
},
"Lifecycle": {
"install": "py -3 -m pip install --user awsiotsdk",
"Run": "py -3 -u {artifacts:path}/pubsub_publisher.py"
}
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.PubSubPublisherPython
ComponentVersion: 1.0.0
ComponentDescription: A component that publishes messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
com.example.PubSubPublisherPython:pubsub:1:
policyDescription: Allows access to publish to all topics.
operations:
- aws.greengrass#PublishToTopic
resources:
- "*"
Manifests:
- Platform:
os: linux
Lifecycle:
install: python3 -m pip install --user awsiotsdk
Run: python3 -u {artifacts:path}/pubsub_publisher.py
- Platform:
os: windows
Lifecycle:
install: py -3 -m pip install --user awsiotsdk
Run: py -3 -u {artifacts:path}/pubsub_publisher.py
다음 예제 Python 애플리케이션은 게시/구독 IPC 서비스를 사용하여 메시지를 다른 구성 요소에 게시하는 방법을 보여줍니다.
import concurrent.futures
import sys
import time
import traceback
import awsiot.greengrasscoreipc
from awsiot.greengrasscoreipc.model import (
PublishToTopicRequest,
PublishMessage,
BinaryMessage,
UnauthorizedError
)
topic = "test/topic/python"
message = "Hello from the pub/sub publisher (Python)."
TIMEOUT = 10
try:
ipc_client = awsiot.greengrasscoreipc.connect()
while True:
request = PublishToTopicRequest()
request.topic = topic
publish_message = PublishMessage()
publish_message.binary_message = BinaryMessage()
publish_message.binary_message.message = bytes(message, "utf-8")
request.publish_message = publish_message
operation = ipc_client.new_publish_to_topic()
operation.activate(request)
future_response = operation.get_response()
try:
future_response.result(TIMEOUT)
print('Successfully published to topic: ' + topic)
except concurrent.futures.TimeoutError:
print('Timeout occurred while publishing to topic: ' + topic, file=sys.stderr)
except UnauthorizedError as e:
print('Unauthorized error while publishing to topic: ' + topic, file=sys.stderr)
raise e
except Exception as e:
print('Exception while publishing to topic: ' + topic, file=sys.stderr)
raise e
time.sleep(5)
except InterruptedError:
print('Publisher interrupted.')
except Exception:
print('Exception occurred when using IPC.', file=sys.stderr)
traceback.print_exc()
exit(1)
다음 예제 레시피에서는 구성 요소가 모든 주제를 구독하도록 허용합니다.
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.PubSubSubscriberPython",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that subscribes to messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.PubSubSubscriberPython:pubsub:1": {
"policyDescription": "Allows access to subscribe to all topics.",
"operations": [
"aws.greengrass#SubscribeToTopic"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Platform": {
"os": "linux"
},
"Lifecycle": {
"install": "python3 -m pip install --user awsiotsdk",
"Run": "python3 -u {artifacts:path}/pubsub_subscriber.py"
}
},
{
"Platform": {
"os": "windows"
},
"Lifecycle": {
"install": "py -3 -m pip install --user awsiotsdk",
"Run": "py -3 -u {artifacts:path}/pubsub_subscriber.py"
}
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.PubSubSubscriberPython
ComponentVersion: 1.0.0
ComponentDescription: A component that subscribes to messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
com.example.PubSubSubscriberPython:pubsub:1:
policyDescription: Allows access to subscribe to all topics.
operations:
- aws.greengrass#SubscribeToTopic
resources:
- "*"
Manifests:
- Platform:
os: linux
Lifecycle:
install: python3 -m pip install --user awsiotsdk
Run: python3 -u {artifacts:path}/pubsub_subscriber.py
- Platform:
os: windows
Lifecycle:
install: py -3 -m pip install --user awsiotsdk
Run: py -3 -u {artifacts:path}/pubsub_subscriber.py
다음 예제 Python 애플리케이션은 게시/구독 IPC 서비스를 사용하여 다른 구성 요소에 대한 메시지를 구독하는 방법을 보여줍니다.
import concurrent.futures
import sys
import time
import traceback
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
SubscribeToTopicRequest,
SubscriptionResponseMessage,
UnauthorizedError
)
topic = "test/topic/python"
TIMEOUT = 10
class StreamHandler(client.SubscribeToTopicStreamHandler):
def __init__(self):
super().__init__()
def on_stream_event(self, event: SubscriptionResponseMessage) -> None:
try:
message = str(event.binary_message.message, "utf-8")
print("Received new message: " + message)
except:
traceback.print_exc()
def on_stream_error(self, error: Exception) -> bool:
print("Received a stream error.", file=sys.stderr)
traceback.print_exc()
return False # Return True to close stream, False to keep stream open.
def on_stream_closed(self) -> None:
print('Subscribe to topic stream closed.')
try:
ipc_client = awsiot.greengrasscoreipc.connect()
request = SubscribeToTopicRequest()
request.topic = topic
handler = StreamHandler()
operation = ipc_client.new_subscribe_to_topic(handler)
operation.activate(request)
future_response = operation.get_response()
try:
future_response.result(TIMEOUT)
print('Successfully subscribed to topic: ' + topic)
except concurrent.futures.TimeoutError as e:
print('Timeout occurred while subscribing to topic: ' + topic, file=sys.stderr)
raise e
except UnauthorizedError as e:
print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr)
raise e
except Exception as e:
print('Exception while subscribing to topic: ' + topic, file=sys.stderr)
raise e
# Keep the main thread alive, or the process will exit.
try:
while True:
time.sleep(10)
except InterruptedError:
print('Subscribe interrupted.')
except Exception:
print('Exception occurred when using IPC.', file=sys.stderr)
traceback.print_exc()
exit(1)
다음 예제 레시피에서는 구성 요소가 모든 주제에 게시할 수 있도록 허용합니다.
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.PubSubPublisherCpp",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that publishes messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.PubSubPublisherCpp:pubsub:1": {
"policyDescription": "Allows access to publish to all topics.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"Run": "{artifacts:path}/greengrassv2_pubsub_publisher"
},
"Artifacts": [
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubPublisherCpp/1.0.0/greengrassv2_pubsub_publisher",
"Permission": {
"Execute": "OWNER"
}
}
]
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.PubSubPublisherCpp
ComponentVersion: 1.0.0
ComponentDescription: A component that publishes messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
com.example.PubSubPublisherCpp:pubsub:1:
policyDescription: Allows access to publish to all topics.
operations:
- aws.greengrass#PublishToTopic
resources:
- "*"
Manifests:
- Lifecycle:
Run: "{artifacts:path}/greengrassv2_pubsub_publisher"
Artifacts:
- URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubPublisherCpp/1.0.0/greengrassv2_pubsub_publisher
Permission:
Execute: OWNER
다음 예제 C++ 애플리케이션은 게시/구독 IPC 서비스를 사용하여 메시지를 다른 구성 요소에 게시하는 방법을 보여줍니다.
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String message("Hello from the pub/sub publisher (C++).");
String topic("test/topic/cpp");
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
while (true) {
PublishToTopicRequest request;
Vector<uint8_t> messageData({message.begin(), message.end()});
BinaryMessage binaryMessage;
binaryMessage.SetMessage(messageData);
PublishMessage publishMessage;
publishMessage.SetBinaryMessage(binaryMessage);
request.SetTopic(topic);
request.SetPublishMessage(publishMessage);
auto operation = ipcClient.NewPublishToTopic();
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully published to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to publish to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
std::this_thread::sleep_for(std::chrono::seconds(5));
}
return 0;
}
다음 예제 레시피에서는 구성 요소가 모든 주제를 구독하도록 허용합니다.
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.PubSubSubscriberCpp",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that subscribes to messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.PubSubSubscriberCpp:pubsub:1": {
"policyDescription": "Allows access to subscribe to all topics.",
"operations": [
"aws.greengrass#SubscribeToTopic"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"Run": "{artifacts:path}/greengrassv2_pub_sub_subscriber"
},
"Artifacts": [
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pub_sub_subscriber",
"Permission": {
"Execute": "OWNER"
}
}
]
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.PubSubSubscriberCpp
ComponentVersion: 1.0.0
ComponentDescription: A component that subscribes to messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
com.example.PubSubSubscriberCpp:pubsub:1:
policyDescription: Allows access to subscribe to all topics.
operations:
- aws.greengrass#SubscribeToTopic
resources:
- "*"
Manifests:
- Lifecycle:
Run: "{artifacts:path}/greengrassv2_pub_sub_subscriber"
Artifacts:
- URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pub_sub_subscriber
Permission:
Execute: OWNER
다음 예제 C++ 애플리케이션은 게시/구독 IPC 서비스를 사용하여 다른 구성 요소에 대한 메시지를 구독하는 방법을 보여줍니다.
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class SubscribeResponseHandler : public SubscribeToTopicStreamHandler {
public:
virtual ~SubscribeResponseHandler() {}
private:
void OnStreamEvent(SubscriptionResponseMessage *response) override {
auto jsonMessage = response->GetJsonMessage();
if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) {
auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable();
std::cout << "Received new message: " << messageString << std::endl;
} else {
auto binaryMessage = response->GetBinaryMessage();
if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) {
auto messageBytes = binaryMessage.value().GetMessage().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
std::cout << "Received new message: " << messageString << std::endl;
}
}
}
bool OnStreamError(OperationError *error) override {
std::cout << "Received an operation error: ";
if (error->GetMessage().has_value()) {
std::cout << error->GetMessage().value();
}
std::cout << std::endl;
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
std::cout << "Subscribe to topic stream closed." << std::endl;
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String topic("test/topic/cpp");
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
SubscribeToTopicRequest request;
request.SetTopic(topic);
auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToTopic(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully subscribed to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to subscribe to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}