使用 AWS IoT Device SDK 与 Greengrass Nucleus、其他组件和 AWS IoT Core 通信
核心设备上运行的组件可以使用 AWS IoT Device SDK 中的 AWS IoT Greengrass Core 进程间通信(IPC)库与 AWS IoT Greengrass Nucleus 和其他 Greengrass 组件通信。要开发和运行使用 IPC 的自定义组件,您必须使用 AWS IoT Device SDK 连接到 AWS IoT Greengrass Core IPC 服务并执行 IPC 操作。
IPC 接口支持两种类型的操作:
IPC 客户端版本
在更高版本的 Java 和 Python SDK 中,AWS IoT Greengrass 提供改进版本的 IPC 客户端,名为 IPC 客户端 V2。IPC 客户端 V2:
-
减少使用 IPC 操作所需的代码量,并有助于避免 IPC 客户端 V1 可能出现的常见错误。
-
在独立的线程中调用订阅处理程序回调,因此您现在可以在订阅处理程序回调中运行阻止代码,包括其他 IPC 函数调用。IPC 客户端 V1 使用相同的线程与 IPC 服务器通信并调用订阅处理程序回调。
-
允许您使用 Lambda 表达式(Java)或函数(Python)调用订阅操作。IPC 客户端 V1 要求您定义订阅处理程序类。
-
提供每个 IPC 操作的同步和异步版本。IPC 客户端 V1 仅提供每个操作的异步版本。
我们建议使用 IPC 客户端 V2 以利用这些改进。但是,本文档和一些在线内容中的许多示例仅演示如何使用 IPC 客户端 V1。您可以使用以下示例和教程来查看使用 IPC 客户端 V2 的示例组件:
目前,适用于 C++ v2 的 AWS IoT Device SDK 仅支持 IPC 客户端 V1。
适用于进程间通信的受支持 SDK
以下 AWS IoT Device SDK 版本中包含 AWS IoT Greengrass Core IPC 库。
连接到 AWS IoT Greengrass Core IPC 服务
要在自定义组件中使用进程间通信,您必须创建与 AWS IoT Greengrass Core 软件运行的 IPC 服务器套接字的连接。完成以下任务,以您所选的语言下载并使用 AWS IoT Device SDK。
使用适用于 Java v2 的 AWS IoT Device SDK(IPC 客户端 V2)
-
下载适用于 Java v2 的 AWS IoT Device SDK(v1.6.0 或更高版本)
-
执行以下某种操作以在组件中运行自定义代码:
-
使用以下代码创建 IPC 客户端。
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
// Use client.
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e);
System.exit(1);
}
使用适用于 Python v2 的 AWS IoT Device SDK(IPC 客户端 V2)
-
下载适用于 Python 的 AWS IoT Device SDK(v1.9.0 或更高版本)。
-
将 SDK 的安装步骤添加到组件配方中的安装生命周期。
-
创建与 AWS IoT Greengrass Core IPC 服务的连接。使用以下代码创建 IPC 客户端。
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
try:
ipc_client = GreengrassCoreIPCClientV2()
# Use IPC client.
except Exception:
print('Exception occurred when using IPC.', file=sys.stderr)
traceback.print_exc()
exit(1)
要构建适用于 C++ 的 AWS IoT Device SDK v2,设备必须具有以下工具:
-
C++ 11 或更高版本
-
CMake 3.1 或更高版本
-
以下编译器之一:
-
GCC 4.8 或更高版本
-
Clang 3.9 或更高版本
-
MSVC 2015 或更高版本
使用适用于 C++ v2 的 AWS IoT Device SDK
-
下载适用于 C++ v2 的 AWS IoT Device SDK(v1.17.0 或更高版本)。
-
按照自述文件中的安装说明从源代码构建适用于 C++ v2 的 AWS IoT Device SDK。
-
在 C++ 构建工具中,链接您在上一步中构建的 Greengrass IPC 库 AWS::GreengrassIpc-cpp
。以下 CMakeLists.txt
示例将 Greengrass IPC 库链接到您使用 CMake 构建的项目。
cmake_minimum_required(VERSION 3.1)
project (greengrassv2_pubsub_subscriber)
file(GLOB MAIN_SRC
"*.h"
"*.cpp"
)
add_executable(${PROJECT_NAME} ${MAIN_SRC})
set_target_properties(${PROJECT_NAME} PROPERTIES
LINKER_LANGUAGE CXX
CXX_STANDARD 11)
find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build)
find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build)
find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build)
target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
-
在组件代码中,创建与 AWS IoT Greengrass Core IPC 服务的连接,以创建 IPC 客户端 (Aws::Greengrass::GreengrassCoreIpcClient
)。您必须定义 IPC 连接生命周期处理程序,用于处理 IPC 连接、断开连接和错误事件。以下示例创建了 IPC 客户端和 IPC 连接生命周期处理程序,在 IPC 客户端连接、断开连接和遇到错误时进行打印。
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
// Create the IPC client.
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
// Use the IPC client to create an operation request.
// Activate the operation request.
auto activate = operation.Activate(request, nullptr);
activate.wait();
// Wait for Greengrass Core to respond to the request.
auto responseFuture = operation.GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
// Check the result of the request.
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully published to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to publish to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
return 0;
}
-
要在组件中运行自定义代码,请将代码构建为二进制构件,然后在组件配方中运行二进制构件。将构件的 Execute
权限设置为 OWNER
,以使 AWS IoT Greengrass Core 软件能够运行二进制构件。
组件配方的 Manifests
部分可能类似于以下示例。
- JSON
-
{
...
"Manifests": [
{
"Lifecycle": {
"run": "{artifacts:path}/greengrassv2_pubsub_subscriber"
},
"Artifacts": [
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber",
"Permission": {
"Execute": "OWNER"
}
}
]
}
]
}
- YAML
-
...
Manifests:
- Lifecycle:
run: {artifacts:path}/greengrassv2_pubsub_subscriber
Artifacts:
- URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber
Permission:
Execute: OWNER
要构建适用于 JavaScript v2 的 AWS IoT Device SDK 以与 NodeJS 结合使用,设备必须具有以下工具:
-
NodeJS 10.0 或更高版本
-
CMake 3.1 或更高版本
使用适用于 JavaScript v2 的 AWS IoT Device SDK(IPC 客户端 V1)
-
下载适用于 JavaScript v2 的 AWS IoT Device SDK(v1.12.10 或更高版本)。
-
按照自述文件中的安装说明从源代码构建适用于 JavaScript v2 的 AWS IoT Device SDK。
-
创建与 AWS IoT Greengrass Core IPC 服务的连接。完成以下步骤,以创建 IPC 客户端并建立连接。
-
使用以下代码创建 IPC 客户端。
import * as greengrascoreipc from 'aws-iot-device-sdk-v2';
let client = greengrascoreipc.createClient();
-
使用以下代码,建立从组件到 Greengrass Nucleus 的连接。
await client.connect();
授权组件执行 IPC 操作
要允许您的自定义组件使用某些 IPC 操作,您必须定义授权策略,允许组件对某些资源执行操作。每个授权策略都定义了策略允许的操作列表和资源列表。例如,发布/订阅消息收发 IPC 服务定义了主题资源的发布和订阅操作。您可以使用 *
通配符以允许访问所有操作或所有资源。
您可以使用 accessControl
配置参数定义授权策略。您可以在组件配方中或在部署组件时设置该参数。accessControl
对象将 IPC 服务标识符映射到授权策略列表。您可以为每个 IPC 服务定义多个授权策略来控制访问权限。每个授权策略都有一个策略 ID,该 ID 在所有组件中必须唯一。
要创建唯一的策略 ID,您可以组合组件名称、IPC 服务名称和计数器。例如,名为 com.example.HelloWorld
的组件可以定义两个具有以下 ID 的发布/订阅授权策略:
授权策略使用以下格式。此对象是 accessControl
配置参数。
- JSON
-
{
"IPC service identifier
": {
"policyId
": {
"policyDescription": "description
",
"operations": [
"operation1
",
"operation2
"
],
"resources": [
"resource1
",
"resource2
"
]
}
}
}
- YAML
-
IPC service identifier
:
policyId
:
policyDescription: description
operations:
- operation1
- operation2
resources:
- resource1
- resource2
授权策略中的通配符
您可以在 IPC 授权策略的 resources
元素中使用 *
通配符,以允许访问单个授权策略中的多个资源。
-
在所有版本的 Greengrass Nucleus 中,您可以指定单个 *
作为资源,以允许访问所有资源。
-
在 Greengrass Nucleus v2.6.0 及更高版本中,您可以在资源中指定 *
字符,以匹配任何字符组合。例如,您可以指定 factory/1/devices/Thermostat*/status
,以允许访问工厂中所有恒温器设备的状态主题,其中每台设备的名称都以 Thermostat
开头。
在为 AWS IoT Core MQTT IPC 服务定义授权策略时,您还可以使用 MQTT 通配符(+
和 #
)来匹配多个资源。有关更多信息,请参阅 AWS IoT Core MQTT IPC 授权策略中的 MQTT 通配符。
授权策略中的配方变量
如果您使用 Greengrass Nucleus v2.6.0 或更高版本,并且将 Greengrass Nucleus 的 interpolateComponentConfiguration 配置选项设置为 true
,则可以在授权策略中使用 {iot:thingName}
配方变量。当您需要包含核心设备名称的授权策略(例如针对 MQTT 主题或设备影子)时,您可以使用此配方变量为一组核心设备配置单个授权策略。例如,您可以允许组件访问以下适用于影子 IPC 操作的资源。
$aws/things/{iot:thingName}/shadow/
授权策略中的特殊字符
要在授权策略中指定文字 *
或 ?
字符,您必须使用转义序列。以下转义序列指示 AWS IoT Greengrass Core 软件使用文字值,而不是字符的特殊含义。例如,*
字符是与任意字符组合匹配的通配符。
文字字符 |
转义序列 |
注意 |
*
|
${*}
|
|
?
|
${?}
|
AWS IoT Greengrass 目前不支持与任何单个字符匹配的 ? 通配符。
|
$
|
${$}
|
使用此转义序列可匹配包含 ${ 的资源。例如,要匹配名为 ${resourceName} 的资源,您必须指定 ${$}{resourceName} 。否则,要匹配包含 $ 的资源,您可以使用文字 $ ,例如允许访问以 $aws 开头的主题。
|
授权策略示例
您可以参考以下授权策略示例,帮助您为组件配置授权策略。
例 具有授权策略的组件配方示例
以下示例组件配方包括可定义授权策略的 accessControl
对象。此策略授权 com.example.HelloWorld
组件发布到 test/topic
主题。
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.HelloWorld",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that publishes messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.HelloWorld:pubsub:1": {
"policyDescription": "Allows access to publish to test/topic.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"test/topic"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"run": "java -jar {artifacts:path}/HelloWorld.jar"
}
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.HelloWorld
ComponentVersion: '1.0.0'
ComponentDescription: A component that publishes messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
"com.example.HelloWorld:pubsub:1":
policyDescription: Allows access to publish to test/topic.
operations:
- "aws.greengrass#PublishToTopic"
resources:
- "test/topic"
Manifests:
- Lifecycle:
run: |-
java -jar {artifacts:path}/HelloWorld.jar
例 使用授权策略的组件配置更新示例
部署中的以下配置更新示例指定使用可定义授权策略的 accessControl
对象来配置组件。此策略授权 com.example.HelloWorld
组件发布到 test/topic
主题。
- Console
-
- 要合并的配置
-
{
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.HelloWorld:pubsub:1": {
"policyDescription": "Allows access to publish to test/topic.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"test/topic"
]
}
}
}
}
- AWS CLI
-
以下命令会创建对核心设备的部署。
aws greengrassv2 create-deployment --cli-input-json file://hello-world-deployment.json
hello-world-deployment.json
文件包含以下 JSON 文档。
{
"targetArn": "arn:aws:iot:us-west-2:123456789012:thing/MyGreengrassCore",
"deploymentName": "Deployment for MyGreengrassCore",
"components": {
"com.example.HelloWorld": {
"componentVersion": "1.0.0",
"configurationUpdate": {
"merge": "{\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"com.example.HelloWorld:pubsub:1\":{\"policyDescription\":\"Allows access to publish to test/topic.\",\"operations\":[\"aws.greengrass#PublishToTopic\"],\"resources\":[\"test/topic\"]}}}}"
}
}
}
}
- Greengrass CLI
-
以下 Greengrass CLI 命令会在核心设备上创建本地部署。
sudo greengrass-cli deployment create \
--recipeDir recipes \
--artifactDir artifacts \
--merge "com.example.HelloWorld=1.0.0" \
--update-config hello-world-configuration.json
hello-world-configuration.json
文件包含以下 JSON 文档。
{
"com.example.HelloWorld": {
"MERGE": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.HelloWorld:pubsub:1": {
"policyDescription": "Allows access to publish to test/topic.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"test/topic"
]
}
}
}
}
}
}
订阅 IPC 事件流
您可以使用 IPC 操作在 Greengrass 核心设备上订阅事件流。要使用订阅操作,请定义订阅处理程序并创建对 IPC 服务的请求。然后,每次核心设备将事件消息流式传输到您的组件时,IPC 客户端都会运行订阅处理程序的函数。
您可以关闭订阅以停止处理事件消息。为此,请在您用于打开订阅的订阅操作对象上调用 closeStream()
(Java)、close()
(Python)或 Close()
(C++)。
AWS IoT Greengrass Core IPC 服务支持以下订阅操作:
定义订阅处理程序
要定义订阅处理程序,请定义回调函数,处理事件消息、错误和流关闭。如果您使用 IPC 客户端 V1,则必须在类中定义这些函数。如果您使用 IPC 客户端 V2(适用于更高版本的 Java 和 Python SDK),则无需创建订阅处理程序类即可定义这些函数。
- Java
-
如果您使用 IPC 客户端 V1,则必须实施通用 software.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType
>
接口。StreamEventType
是订阅操作的事件消息类型。定义以下函数以处理事件消息、错误和流关闭。
如果您使用 IPC 客户端 V2,则可以在订阅处理程序类之外定义这些函数,也可以使用 Lambda 表达式。
void onStreamEvent(StreamEventType
event)
-
IPC 客户端在收到事件消息(例如 MQTT 消息或组件更新通知)时调用的回调。
boolean onStreamError(Throwable error)
-
IPC 客户端在流错误发生时调用的回调。
如果出现错误,则返回 True 以关闭订阅流,或返回 False 以保持流处于打开状态。
void onStreamClosed()
-
IPC 客户端在流关闭时调用的回调。
- Python
-
如果您使用 IPC 客户端 V1,则必须扩展与订阅操作对应的流响应处理程序类。AWS IoT Device SDK 包括每个订阅操作的订阅处理程序类。StreamEventType
是订阅操作的事件消息类型。定义以下函数以处理事件消息、错误和流关闭。
如果您使用 IPC 客户端 V2,则可以在订阅处理程序类之外定义这些函数,也可以使用 Lambda 表达式。
def on_stream_event(self, event:
StreamEventType
) -> None
-
IPC 客户端在收到事件消息(例如 MQTT 消息或组件更新通知)时调用的回调。
def on_stream_error(self, error: Exception) -> bool
-
IPC 客户端在流错误发生时调用的回调。
如果出现错误,则返回 True 以关闭订阅流,或返回 False 以保持流处于打开状态。
def on_stream_closed(self) -> None
-
IPC 客户端在流关闭时调用的回调。
- C++
-
实施从与订阅操作对应的流响应处理程序类派生的类。AWS IoT Device SDK 包括每个订阅操作的订阅处理程序基类。StreamEventType
是订阅操作的事件消息类型。定义以下函数以处理事件消息、错误和流关闭。
void OnStreamEvent(StreamEventType
*event)
-
IPC 客户端在收到事件消息(例如 MQTT 消息或组件更新通知)时调用的回调。
bool OnStreamError(OperationError *error)
-
IPC 客户端在流错误发生时调用的回调。
如果出现错误,则返回 True 以关闭订阅流,或返回 False 以保持流处于打开状态。
void OnStreamClosed()
-
IPC 客户端在流关闭时调用的回调。
- JavaScript
-
实施从与订阅操作对应的流响应处理程序类派生的类。AWS IoT Device SDK 包括每个订阅操作的订阅处理程序基类。StreamEventType
是订阅操作的事件消息类型。定义以下函数以处理事件消息、错误和流关闭。
on(event: 'ended', listener: StreamingOperationEndedListener)
-
IPC 客户端在流关闭时调用的回调。
on(event: 'streamError', listener: StreamingRpcErrorListener)
-
IPC 客户端在流错误发生时调用的回调。
如果出现错误,则返回 True 以关闭订阅流,或返回 False 以保持流处于打开状态。
on(event: 'message', listener: (message: InboundMessageType) => void)
-
IPC 客户端在收到事件消息(例如 MQTT 消息或组件更新通知)时调用的回调。
订阅处理程序示例
以下示例演示如何使用 SubscribeToTopic 操作和订阅处理程序来订阅本地发布/订阅消息。
- Java (IPC client V2)
-
例 示例:订阅本地发布/订阅消息
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.*;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
public class SubscribeToTopicV2 {
public static void main(String[] args) {
String topic = args[0];
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic);
GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse,
SubscribeToTopicResponseHandler> response =
ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent,
Optional.of(SubscribeToTopicV2::onStreamError),
Optional.of(SubscribeToTopicV2::onStreamClosed));
SubscribeToTopicResponseHandler responseHandler = response.getHandler();
System.out.println("Successfully subscribed to topic: " + topic);
// Keep the main thread alive, or the process will exit.
try {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
}
// To stop subscribing, close the stream.
responseHandler.closeStream();
} catch (Exception e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Exception occurred when using IPC.");
}
e.printStackTrace();
System.exit(1);
}
}
public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) {
try {
BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage();
String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8);
String topic = binaryMessage.getContext().getTopic();
System.out.printf("Received new message on topic %s: %s%n", topic, message);
} catch (Exception e) {
System.err.println("Exception occurred while processing subscription response " +
"message.");
e.printStackTrace();
}
}
public static boolean onStreamError(Throwable error) {
System.err.println("Received a stream error.");
error.printStackTrace();
return false; // Return true to close stream, false to keep stream open.
}
public static void onStreamClosed() {
System.out.println("Subscribe to topic stream closed.");
}
}
- Python (IPC client V2)
-
例 示例:订阅本地发布/订阅消息
import sys
import time
import traceback
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import (
SubscriptionResponseMessage,
UnauthorizedError
)
def main():
args = sys.argv[1:]
topic = args[0]
try:
ipc_client = GreengrassCoreIPCClientV2()
# Subscription operations return a tuple with the response and the operation.
_, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event,
on_stream_error=on_stream_error, on_stream_closed=on_stream_closed)
print('Successfully subscribed to topic: ' + topic)
# Keep the main thread alive, or the process will exit.
try:
while True:
time.sleep(10)
except InterruptedError:
print('Subscribe interrupted.')
# To stop subscribing, close the stream.
operation.close()
except UnauthorizedError:
print('Unauthorized error while subscribing to topic: ' +
topic, file=sys.stderr)
traceback.print_exc()
exit(1)
except Exception:
print('Exception occurred', file=sys.stderr)
traceback.print_exc()
exit(1)
def on_stream_event(event: SubscriptionResponseMessage) -> None:
try:
message = str(event.binary_message.message, 'utf-8')
topic = event.binary_message.context.topic
print('Received new message on topic %s: %s' % (topic, message))
except:
traceback.print_exc()
def on_stream_error(error: Exception) -> bool:
print('Received a stream error.', file=sys.stderr)
traceback.print_exc()
return False # Return True to close stream, False to keep stream open.
def on_stream_closed() -> None:
print('Subscribe to topic stream closed.')
if __name__ == '__main__':
main()
- C++
-
例 示例:订阅本地发布/订阅消息
#include <iostream>
#include </crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class SubscribeResponseHandler : public SubscribeToTopicStreamHandler {
public:
virtual ~SubscribeResponseHandler() {}
private:
void OnStreamEvent(SubscriptionResponseMessage *response) override {
auto jsonMessage = response->GetJsonMessage();
if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) {
auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable();
// Handle JSON message.
} else {
auto binaryMessage = response->GetBinaryMessage();
if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) {
auto messageBytes = binaryMessage.value().GetMessage().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
// Handle binary message.
}
}
}
bool OnStreamError(OperationError *error) override {
// Handle error.
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
// Handle close.
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
// Handle connection to IPC service.
}
void OnDisconnectCallback(RpcError error) override {
// Handle disconnection from IPC service.
}
bool OnErrorCallback(RpcError error) override {
// Handle IPC service connection error.
return true;
}
};
int main() {
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
String topic("my/topic");
int timeout = 10;
SubscribeToTopicRequest request;
request.SetTopic(topic);
//SubscribeResponseHandler streamHandler;
auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToTopic(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (!response) {
// Handle error.
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
(void)error;
// Handle operation error.
} else {
// Handle RPC error.
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}
- JavaScript
-
例 示例:订阅本地发布/订阅消息
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc";
import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model";
import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc";
class SubscribeToTopic {
private ipcClient : greengrasscoreipc.Client
private readonly topic : string;
constructor() {
// define your own constructor, e.g.
this.topic = "<define_your_topic>";
this.subscribeToTopic().then(r => console.log("Started workflow"));
}
private async subscribeToTopic() {
try {
this.ipcClient = await getIpcClient();
const subscribeToTopicRequest : SubscribeToTopicRequest = {
topic: this.topic,
}
const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options
streamingOperation.on("message", (message: SubscriptionResponseMessage) => {
// parse the message depending on your use cases, e.g.
if(message.binaryMessage && message.binaryMessage.message) {
const receivedMessage = message.binaryMessage?.message.toString();
}
});
streamingOperation.on("streamError", (error : RpcError) => {
// define your own error handling logic
})
streamingOperation.on("ended", () => {
// define your own logic
})
await streamingOperation.activate();
// Keep the main thread alive, or the process will exit.
await new Promise((resolve) => setTimeout(resolve, 10000))
} catch (e) {
// parse the error depending on your use cases
throw e
}
}
}
export async function getIpcClient(){
try {
const ipcClient = greengrasscoreipc.createClient();
await ipcClient.connect()
.catch(error => {
// parse the error depending on your use cases
throw error;
});
return ipcClient
} catch (err) {
// parse the error depending on your use cases
throw err
}
}
// starting point
const subscribeToTopic = new SubscribeToTopic();
IPC 最佳实践
在自定义组件中使用 IPC 的最佳实践与在 IPC 客户端 V1 和 IPC 客户端 V2 之间有所不同。请遵循适用于您所使用的 IPC 客户端版本的最佳实践。
- IPC client V2
-
IPC 客户端 V2 在单独的线程中运行回调函数,因此与 IPC 客户端 V1 相比,在使用 IPC 和编写订阅处理程序函数时,您需要遵循的准则较少。
- IPC client V1
-
IPC 客户端 V1 使用单线程,与 IPC 服务器通信并调用订阅处理程序。在编写订阅处理程序函数时,您必须考虑这种同步行为。
-
重复使用一个 IPC 客户端
创建 IPC 客户端后,请将其保持打开状态并重复用于所有 IPC 操作。创建多个客户端会消耗额外的资源,并可能导致资源泄漏。
-
异步运行阻止代码
当线程被阻止时,IPC 客户端 V1 无法发送新请求或处理新的事件消息。您应该在从处理程序函数运行的独立线程中运行阻止代码。阻止代码包括 sleep
调用、持续运行的循环,以及需要一段时间才能完成的同步 I/O 请求。
-
异步发送新的 IPC 请求
IPC 客户端 V1 无法从订阅处理程序函数中发送新请求,因为如果您等待响应,该请求会阻止处理程序函数。您应该在从处理程序函数运行的独立线程中发送 IPC 请求。
-
处理异常
IPC 客户端 V1 不处理订阅处理函数中未捕获的异常。如果您的处理函数抛出异常,则订阅关闭,并且该异常不会出现在您的组件日志中。您应该捕获处理程序函数中的异常,以确保订阅保持打开状态并记录代码中发生的错误。