發佈/訂閱本地訊息 - AWS IoT Greengrass

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

發佈/訂閱本地訊息

發佈/訂閱 (pubsub) 訊息可讓您傳送和接收主題的訊息。組件可以將消息發佈到主題以將消息發送到其他組件。接著,訂閱該主題的元件就可以對其收到的訊息採取行動。

注意

您不能使用此發布/訂閱 IPC 服務來發布或訂閱 MQTT。 AWS IoT Core 如需如何與 AWS IoT Core MQTT 交換郵件的詳細資訊,請參閱。發布/訂閱MQTT 訊 AWS IoT Core 息

最低 SDK 版本

下表列出在本機主題中發行及訂閱訊息時,必須使用的最低版本。 AWS IoT Device SDK

授權

若要在自訂元件中使用本機發佈/訂閱訊息,您必須定義允許元件傳送和接收訊息至主題的授權原則。如需定義授權原則的資訊,請參閱授權元件執行 IPC 作業

發佈/訂閱訊息的授權原則具有下列屬性。

IPC 服務識別碼:aws.greengrass.ipc.pubsub

作業 描述 資源

aws.greengrass#PublishToTopic

允許元件將訊息發佈到您指定的主題。

主題字串,例如test/topic。使用*來比對主題中任何字元組合。

此主題字串不支援 MQTT 主題萬用字元 (#和)。+

aws.greengrass#SubscribeToTopic

允許元件訂閱您指定主題的訊息。

主題字串,例如test/topic。使用*來比對主題中任何字元組合。

Greengrass 核心 v2.6.0 及更新版本中,您可以訂閱包含 MQTT 主題萬用字元 (和) 的主題。# +此主題字串支援 MQTT 主題萬用字元做為常值字元。例如,如果元件的授權原則授與存取權test/topic/#,則該元件可以訂閱test/topic/#,但無法訂閱test/topic/filter

*

允許元件針對您指定的主題發佈和訂閱訊息。

主題字串,例如test/topic。使用*來比對主題中任何字元組合。

Greengrass 核心 v2.6.0 及更新版本中,您可以訂閱包含 MQTT 主題萬用字元 (和) 的主題。# +此主題字串支援 MQTT 主題萬用字元做為常值字元。例如,如果元件的授權原則授與存取權test/topic/#,則該元件可以訂閱test/topic/#,但無法訂閱test/topic/filter

授權政策範例

您可以參考下列授權原則範例,協助您設定元件的授權原則。

範例 授權政策範例

下列範例授權原則可讓元件發佈和訂閱所有主題。

{ "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.MyLocalPubSubComponent:pubsub:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToTopic", "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } }

PublishToTopic

發布訊息至主題。

請求

此操作的請求具有以下參數:

topic

要發佈郵件的目標主題。

publishMessage(Python:publish_message)

要發佈的訊息。此物件PublishMessage包含下列資訊。您必須指定jsonMessage與之一binaryMessage

jsonMessage(Python:json_message)

(選擇性) JSON 訊息。此物件JsonMessage包含下列資訊:

message

JSON 消息作為一個對象。

context

訊息的內容,例如發佈郵件的主題。

此功能適用於 v2.6.0 及更高版 Greeng rass 核組件。下表列出存取訊息前後關聯 AWS IoT Device SDK 所必須使用的最低版本。

注意

AWS IoT Greengrass 核心軟體在PublishToTopicSubscribeToTopic作業中使用相同的訊息物件。當您訂閱時, AWS IoT Greengrass Core 軟體會在訊息中設定此前後關聯物件,並在您發佈的訊息中忽略此前後關聯物件。

此物件MessageContext包含下列資訊:

topic

發佈郵件的主題。

binaryMessage(Python:binary_message)

(選擇性) 二進位訊息。此物件BinaryMessage包含下列資訊:

message

二進制消息作為一個 blob。

context

訊息的內容,例如發佈郵件的主題。

此功能適用於 v2.6.0 及更高版 Greeng rass 核組件。下表列出存取訊息前後關聯 AWS IoT Device SDK 所必須使用的最低版本。

注意

AWS IoT Greengrass 核心軟體在PublishToTopicSubscribeToTopic作業中使用相同的訊息物件。當您訂閱時, AWS IoT Greengrass Core 軟體會在訊息中設定此前後關聯物件,並在您發佈的訊息中忽略此前後關聯物件。

此物件MessageContext包含下列資訊:

topic

發佈郵件的主題。

回應

此操作在其響應中不提供任何信息。

範例

下列範例示範如何在自訂元件程式碼中呼叫此作業。

Java (IPC client V2)
範例:發佈二進位訊息
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.model.BinaryMessage; import software.amazon.awssdk.aws.greengrass.model.PublishMessage; import software.amazon.awssdk.aws.greengrass.model.PublishToTopicRequest; import software.amazon.awssdk.aws.greengrass.model.PublishToTopicResponse; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import java.nio.charset.StandardCharsets; public class PublishToTopicV2 { public static void main(String[] args) { String topic = args[0]; String message = args[1]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { PublishToTopicV2.publishBinaryMessageToTopic(ipcClient, topic, message); System.out.println("Successfully published to topic: " + topic); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static PublishToTopicResponse publishBinaryMessageToTopic( GreengrassCoreIPCClientV2 ipcClient, String topic, String message) throws InterruptedException { BinaryMessage binaryMessage = new BinaryMessage().withMessage(message.getBytes(StandardCharsets.UTF_8)); PublishMessage publishMessage = new PublishMessage().withBinaryMessage(binaryMessage); PublishToTopicRequest publishToTopicRequest = new PublishToTopicRequest().withTopic(topic).withPublishMessage(publishMessage); return ipcClient.publishToTopic(publishToTopicRequest); } }
Python (IPC client V2)
範例:發佈二進位訊息
import sys import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( PublishMessage, BinaryMessage ) def main(): args = sys.argv[1:] topic = args[0] message = args[1] try: ipc_client = GreengrassCoreIPCClientV2() publish_binary_message_to_topic(ipc_client, topic, message) print('Successfully published to topic: ' + topic) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def publish_binary_message_to_topic(ipc_client, topic, message): binary_message = BinaryMessage(message=bytes(message, 'utf-8')) publish_message = PublishMessage(binary_message=binary_message) return ipc_client.publish_to_topic(topic=topic, publish_message=publish_message) if __name__ == '__main__': main()
C++
範例:發佈二進位訊息
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); String message("Hello, World!"); int timeout = 10; PublishToTopicRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); BinaryMessage binaryMessage; binaryMessage.SetMessage(messageData); PublishMessage publishMessage; publishMessage.SetBinaryMessage(binaryMessage); request.SetTopic(topic); request.SetPublishMessage(publishMessage); auto operation = ipcClient.NewPublishToTopic(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } } return 0; }
JavaScript
範例:發佈二進位訊息
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {BinaryMessage, PublishMessage, PublishToTopicRequest} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; class PublishToTopic { private ipcClient : greengrasscoreipc.Client private readonly topic : string; private readonly messageString : string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.messageString = "<define_your_message_string>"; this.publishToTopic().then(r => console.log("Started workflow")); } private async publishToTopic() { try { this.ipcClient = await getIpcClient(); const binaryMessage : BinaryMessage = { message: this.messageString } const publishMessage : PublishMessage = { binaryMessage: binaryMessage } const request : PublishToTopicRequest = { topic: this.topic, publishMessage: publishMessage } this.ipcClient.publishToTopic(request).finally(() => console.log(`Published message ${publishMessage.binaryMessage?.message} to topic`)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const publishToTopic = new PublishToTopic();

SubscribeToTopic

訂閱有關主題的訊息。

此作業是訂閱作業,您可以在其中訂閱事件訊息串流。若要使用此作業,請定義具有處理事件訊息、錯誤和資料流結束之函數的串流回應處理常式。如需詳細資訊,請參閱 訂閱 IPC 事件串流

事件訊息類型:SubscriptionResponseMessage

請求

此操作的請求具有以下參數:

topic

要訂閱的主題。

注意

Greengrass 核 v2.6.0 及更新版本中,本主題支援 MQTT 主題萬用字元 (和)。# +

receiveMode(Python:receive_mode)

(選擇性) 指定元件是否從本身接收訊息的行為。您可以變更此行為,以允許元件對自己的訊息執行動作。預設行為取決於主題是否包含 MQTT 萬用字元。您可以從以下選項中選擇:

  • RECEIVE_ALL_MESSAGES— 接收符合主題的所有訊息,包括來自訂閱元件的訊息。

    當您訂閱不包含 MQTT 萬用字元的主題時,此模式為預設選項。

  • RECEIVE_MESSAGES_FROM_OTHERS— 接收符合主題的所有訊息,但來自訂閱元件的訊息除外。

    當您訂閱包含 MQTT 萬用字元的主題時,此模式是預設選項。

此功能適用於 v2.6.0 及更高版 Greeng rass 核組件。下表列出設定接收模式 AWS IoT Device SDK 所必須使用的最低版本。

回應

此作業的回應包含下列資訊:

messages

訊息串流。此物件SubscriptionResponseMessage包含下列資訊。每個訊息都包含jsonMessagebinaryMessage

jsonMessage(Python:json_message)

(選擇性) JSON 訊息。此物件JsonMessage包含下列資訊:

message

JSON 消息作為一個對象。

context

訊息的內容,例如發佈郵件的主題。

此功能適用於 v2.6.0 及更高版 Greeng rass 核組件。下表列出存取訊息前後關聯 AWS IoT Device SDK 所必須使用的最低版本。

注意

AWS IoT Greengrass 核心軟體在PublishToTopicSubscribeToTopic作業中使用相同的訊息物件。當您訂閱時, AWS IoT Greengrass Core 軟體會在訊息中設定此前後關聯物件,並在您發佈的訊息中忽略此前後關聯物件。

此物件MessageContext包含下列資訊:

topic

發佈郵件的主題。

binaryMessage(Python:binary_message)

(選擇性) 二進位訊息。此物件BinaryMessage包含下列資訊:

message

二進制消息作為一個 blob。

context

訊息的內容,例如發佈郵件的主題。

此功能適用於 v2.6.0 及更高版 Greeng rass 核組件。下表列出存取訊息前後關聯 AWS IoT Device SDK 所必須使用的最低版本。

注意

AWS IoT Greengrass 核心軟體在PublishToTopicSubscribeToTopic作業中使用相同的訊息物件。當您訂閱時, AWS IoT Greengrass Core 軟體會在訊息中設定此前後關聯物件,並在您發佈的訊息中忽略此前後關聯物件。

此物件MessageContext包含下列資訊:

topic

發佈郵件的主題。

topicName(Python:topic_name)

郵件發行目標的主題。

注意

此屬性目前未使用。在 Greengrass 核 v2.6.0 及更高版本中,您可以從 a SubscriptionResponseMessage 中獲取(jsonMessage|binaryMessage).context.topic值以獲取發布消息的主題。

範例

下列範例示範如何在自訂元件程式碼中呼叫此作業。

Java (IPC client V2)
範例:訂閱本地發佈/訂閱訊息
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import java.nio.charset.StandardCharsets; import java.util.Optional; public class SubscribeToTopicV2 { public static void main(String[] args) { String topic = args[0]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse, SubscribeToTopicResponseHandler> response = ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent, Optional.of(SubscribeToTopicV2::onStreamError), Optional.of(SubscribeToTopicV2::onStreamClosed)); SubscribeToTopicResponseHandler responseHandler = response.getHandler(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage(); String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8); String topic = binaryMessage.getContext().getTopic(); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } public static boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } public static void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } }
Python (IPC client V2)
範例:訂閱本地發佈/訂閱訊息
import sys import time import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( SubscriptionResponseMessage, UnauthorizedError ) def main(): args = sys.argv[1:] topic = args[0] try: ipc_client = GreengrassCoreIPCClientV2() # Subscription operations return a tuple with the response and the operation. _, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed) print('Successfully subscribed to topic: ' + topic) # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') # To stop subscribing, close the stream. operation.close() except UnauthorizedError: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) traceback.print_exc() exit(1) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def on_stream_event(event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, 'utf-8') topic = event.binary_message.context.topic print('Received new message on topic %s: %s' % (topic, message)) except: traceback.print_exc() def on_stream_error(error: Exception) -> bool: print('Received a stream error.', file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed() -> None: print('Subscribe to topic stream closed.') if __name__ == '__main__': main()
C++
範例:訂閱本地發佈/訂閱訊息
#include <iostream> #include </crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); // Handle JSON message. } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); // Handle binary message. } } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); int timeout = 10; SubscribeToTopicRequest request; request.SetTopic(topic); //SubscribeResponseHandler streamHandler; auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
JavaScript
範例:訂閱本地發佈/訂閱訊息
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc"; class SubscribeToTopic { private ipcClient : greengrasscoreipc.Client private readonly topic : string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.subscribeToTopic().then(r => console.log("Started workflow")); } private async subscribeToTopic() { try { this.ipcClient = await getIpcClient(); const subscribeToTopicRequest : SubscribeToTopicRequest = { topic: this.topic, } const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options streamingOperation.on("message", (message: SubscriptionResponseMessage) => { // parse the message depending on your use cases, e.g. if(message.binaryMessage && message.binaryMessage.message) { const receivedMessage = message.binaryMessage?.message.toString(); } }); streamingOperation.on("streamError", (error : RpcError) => { // define your own error handling logic }) streamingOperation.on("ended", () => { // define your own logic }) await streamingOperation.activate(); // Keep the main thread alive, or the process will exit. await new Promise((resolve) => setTimeout(resolve, 10000)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const subscribeToTopic = new SubscribeToTopic();

範例

使用下列範例,瞭解如何在元件中使用發佈/訂閱 IPC 服務。

下列範例方案可讓元件發佈至所有主題。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherJava", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherJava:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "run": "java -jar {artifacts:path}/PubSubPublisher.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubPublisherJava ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: 'com.example.PubSubPublisherJava:pubsub:1': policyDescription: Allows access to publish to all topics. operations: - 'aws.greengrass#PublishToTopic' resources: - '*' Manifests: - Lifecycle: run: |- java -jar {artifacts:path}/PubSubPublisher.jar

下列範例 Java 應用程式示範如何使用發佈/訂閱 IPC 服務,將訊息發佈至其他元件。

/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.ipc.pubsub; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.model.*; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PubSubPublisher { public static void main(String[] args) { String message = "Hello from the pub/sub publisher (Java)."; String topic = "test/topic/java"; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); while (true) { PublishToTopicRequest publishRequest = new PublishToTopicRequest(); PublishMessage publishMessage = new PublishMessage(); BinaryMessage binaryMessage = new BinaryMessage(); binaryMessage.setMessage(message.getBytes(StandardCharsets.UTF_8)); publishMessage.setBinaryMessage(binaryMessage); publishRequest.setPublishMessage(publishMessage); publishRequest.setTopic(topic); CompletableFuture<PublishToTopicResponse> futureResponse = ipcClient .publishToTopic(publishRequest, Optional.empty()).getResponse(); try { futureResponse.get(10, TimeUnit.SECONDS); System.out.println("Successfully published to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while publishing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Execution exception while publishing to topic: " + topic); } throw e; } Thread.sleep(5000); } } catch (InterruptedException e) { System.out.println("Publisher interrupted."); } catch (Exception e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } }

下列範例方案可讓元件訂閱所有主題。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberJava", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberJava:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "run": "java -jar {artifacts:path}/PubSubSubscriber.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubSubscriberJava ComponentVersion: '1.0.0' ComponentDescription: A component that subscribes to messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: 'com.example.PubSubSubscriberJava:pubsub:1': policyDescription: Allows access to subscribe to all topics. operations: - 'aws.greengrass#SubscribeToTopic' resources: - '*' Manifests: - Lifecycle: run: |- java -jar {artifacts:path}/PubSubSubscriber.jar

下列範例 Java 應用程式示範如何使用發佈/訂閱 IPC 服務來訂閱其他元件的訊息。

/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.ipc.pubsub; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicRequest; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicResponse; import software.amazon.awssdk.aws.greengrass.model.SubscriptionResponseMessage; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PubSubSubscriber { public static void main(String[] args) { String topic = "test/topic/java"; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); SubscribeToTopicRequest subscribeRequest = new SubscribeToTopicRequest(); subscribeRequest.setTopic(topic); SubscribeToTopicResponseHandler operationResponseHandler = ipcClient .subscribeToTopic(subscribeRequest, Optional.of(new SubscribeResponseHandler())); CompletableFuture<SubscribeToTopicResponse> futureResponse = operationResponseHandler.getResponse(); try { futureResponse.get(10, TimeUnit.SECONDS); System.out.println("Successfully subscribed to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while subscribing to topic: " + topic); throw e; } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while subscribing to topic: " + topic); } else { System.err.println("Execution exception while subscribing to topic: " + topic); } throw e; } // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } } catch (Exception e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } private static class SubscribeResponseHandler implements StreamResponseHandler<SubscriptionResponseMessage> { @Override public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { String message = new String(subscriptionResponseMessage.getBinaryMessage() .getMessage(), StandardCharsets.UTF_8); System.out.println("Received new message: " + message); } catch (Exception e) { e.printStackTrace(); } } @Override public boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } @Override public void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } } }

下列範例方案可讓元件發佈至所有主題。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherPython", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherPython:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "python3 -m pip install --user awsiotsdk", "run": "python3 -u {artifacts:path}/pubsub_publisher.py" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "install": "py -3 -m pip install --user awsiotsdk", "run": "py -3 -u {artifacts:path}/pubsub_publisher.py" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubPublisherPython ComponentVersion: 1.0.0 ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubPublisherPython:pubsub:1: policyDescription: Allows access to publish to all topics. operations: - aws.greengrass#PublishToTopic resources: - "*" Manifests: - Platform: os: linux Lifecycle: install: python3 -m pip install --user awsiotsdk run: python3 -u {artifacts:path}/pubsub_publisher.py - Platform: os: windows Lifecycle: install: py -3 -m pip install --user awsiotsdk run: py -3 -u {artifacts:path}/pubsub_publisher.py

以下示例 Python 應用程序演示瞭如何使用發布/訂閱 IPC 服務將消息發布到其他組件。

import concurrent.futures import sys import time import traceback import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import ( PublishToTopicRequest, PublishMessage, BinaryMessage, UnauthorizedError ) topic = "test/topic/python" message = "Hello from the pub/sub publisher (Python)." TIMEOUT = 10 try: ipc_client = awsiot.greengrasscoreipc.connect() while True: request = PublishToTopicRequest() request.topic = topic publish_message = PublishMessage() publish_message.binary_message = BinaryMessage() publish_message.binary_message.message = bytes(message, "utf-8") request.publish_message = publish_message operation = ipc_client.new_publish_to_topic() operation.activate(request) future_response = operation.get_response() try: future_response.result(TIMEOUT) print('Successfully published to topic: ' + topic) except concurrent.futures.TimeoutError: print('Timeout occurred while publishing to topic: ' + topic, file=sys.stderr) except UnauthorizedError as e: print('Unauthorized error while publishing to topic: ' + topic, file=sys.stderr) raise e except Exception as e: print('Exception while publishing to topic: ' + topic, file=sys.stderr) raise e time.sleep(5) except InterruptedError: print('Publisher interrupted.') except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

下列範例方案可讓元件訂閱所有主題。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberPython", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberPython:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "python3 -m pip install --user awsiotsdk", "run": "python3 -u {artifacts:path}/pubsub_subscriber.py" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "install": "py -3 -m pip install --user awsiotsdk", "run": "py -3 -u {artifacts:path}/pubsub_subscriber.py" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubSubscriberPython ComponentVersion: 1.0.0 ComponentDescription: A component that subscribes to messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubSubscriberPython:pubsub:1: policyDescription: Allows access to subscribe to all topics. operations: - aws.greengrass#SubscribeToTopic resources: - "*" Manifests: - Platform: os: linux Lifecycle: install: python3 -m pip install --user awsiotsdk run: python3 -u {artifacts:path}/pubsub_subscriber.py - Platform: os: windows Lifecycle: install: py -3 -m pip install --user awsiotsdk run: py -3 -u {artifacts:path}/pubsub_subscriber.py

以下示例 Python 應用程序演示瞭如何使用發布/訂閱 IPC 服務來訂閱消息到其他組件。

import concurrent.futures import sys import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( SubscribeToTopicRequest, SubscriptionResponseMessage, UnauthorizedError ) topic = "test/topic/python" TIMEOUT = 10 class StreamHandler(client.SubscribeToTopicStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, "utf-8") print("Received new message: " + message) except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: print("Received a stream error.", file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: print('Subscribe to topic stream closed.') try: ipc_client = awsiot.greengrasscoreipc.connect() request = SubscribeToTopicRequest() request.topic = topic handler = StreamHandler() operation = ipc_client.new_subscribe_to_topic(handler) operation.activate(request) future_response = operation.get_response() try: future_response.result(TIMEOUT) print('Successfully subscribed to topic: ' + topic) except concurrent.futures.TimeoutError as e: print('Timeout occurred while subscribing to topic: ' + topic, file=sys.stderr) raise e except UnauthorizedError as e: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) raise e except Exception as e: print('Exception while subscribing to topic: ' + topic, file=sys.stderr) raise e # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

下列範例方案可讓元件發佈至所有主題。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherCpp:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "run": "{artifacts:path}/greengrassv2_pubsub_publisher" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubPublisherCpp/1.0.0/greengrassv2_pubsub_publisher", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubPublisherCpp ComponentVersion: 1.0.0 ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubPublisherCpp:pubsub:1: policyDescription: Allows access to publish to all topics. operations: - aws.greengrass#PublishToTopic resources: - "*" Manifests: - Lifecycle: run: "{artifacts:path}/greengrassv2_pubsub_publisher" Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubPublisherCpp/1.0.0/greengrassv2_pubsub_publisher Permission: Execute: OWNER

下列範例 C++ 應用程式示範如何使用發佈/訂閱 IPC 服務,將訊息發佈至其他元件。

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String message("Hello from the pub/sub publisher (C++)."); String topic("test/topic/cpp"); int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } while (true) { PublishToTopicRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); BinaryMessage binaryMessage; binaryMessage.SetMessage(messageData); PublishMessage publishMessage; publishMessage.SetBinaryMessage(binaryMessage); request.SetTopic(topic); request.SetPublishMessage(publishMessage); auto operation = ipcClient.NewPublishToTopic(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } std::this_thread::sleep_for(std::chrono::seconds(5)); } return 0; }

下列範例方案可讓元件訂閱所有主題。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberCpp:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "run": "{artifacts:path}/greengrassv2_pub_sub_subscriber" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pub_sub_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubSubscriberCpp ComponentVersion: 1.0.0 ComponentDescription: A component that subscribes to messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubSubscriberCpp:pubsub:1: policyDescription: Allows access to subscribe to all topics. operations: - aws.greengrass#SubscribeToTopic resources: - "*" Manifests: - Lifecycle: run: "{artifacts:path}/greengrassv2_pub_sub_subscriber" Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pub_sub_subscriber Permission: Execute: OWNER

下列範例 C++ 應用程式示範如何使用發佈/訂閱 IPC 服務來訂閱其他元件的訊息。

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); std::cout << "Received new message: " << messageString << std::endl; } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::cout << "Received new message: " << messageString << std::endl; } } } bool OnStreamError(OperationError *error) override { std::cout << "Received an operation error: "; if (error->GetMessage().has_value()) { std::cout << error->GetMessage().value(); } std::cout << std::endl; return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { std::cout << "Subscribe to topic stream closed." << std::endl; } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String topic("test/topic/cpp"); int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } SubscribeToTopicRequest request; request.SetTopic(topic); auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully subscribed to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to subscribe to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }