本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 AWS IoT Device SDK 与 Greengrass 原子核、其他组件进行通信 AWS IoT Core
在核心设备上运行的组件可以使用中的 AWS IoT Greengrass 核心进程间通信 (IPC) 库与 AWS IoT Greengrass 核心和其他 Gre AWS IoT Device SDK engrass 组件进行通信。要开发和运行使用 IPC 的自定义组件,必须使用连接 AWS IoT Device SDK 到 C AWS IoT Greengrass ore IPC 服务并执行 IPC 操作。
IPC 接口支持两种类型的操作:
IPC 客户端版本
在更高版本的 Java 和 Python 中 SDKs, AWS IoT Greengrass 提供了 IPC 客户端的改进版本,名为 IPC 客户端 V2。IPC 客户端 V2:
-
减少使用 IPC 操作所需的代码量,并有助于避免 IPC 客户端 V1 可能出现的常见错误。
-
在独立的线程中调用订阅处理程序回调,因此您现在可以在订阅处理程序回调中运行阻止代码,包括其他 IPC 函数调用。IPC 客户端 V1 使用相同的线程与 IPC 服务器通信并调用订阅处理程序回调。
-
允许您使用 Lambda 表达式(Java)或函数(Python)调用订阅操作。IPC 客户端 V1 要求您定义订阅处理程序类。
-
提供每个 IPC 操作的同步和异步版本。IPC 客户端 V1 仅提供每个操作的异步版本。
我们建议使用 IPC 客户端 V2 以利用这些改进。但是,本文档和一些在线内容中的许多示例仅演示如何使用 IPC 客户端 V1。您可以使用以下示例和教程来查看使用 IPC 客户端 V2 的示例组件:
目前, AWS IoT Device SDK 适用于 C++ 的 v2 仅支持 IPC 客户端 V1。
SDKs 支持进程间通信
C AWS IoT Greengrass ore IPC 库包含在以下 AWS IoT Device SDK 版本中。
Connect 到 C AWS IoT Greengrass ore IPC 服务
要在自定义组件中使用进程间通信,必须创建与 C AWS IoT Greengrass ore 软件运行的 IPC 服务器套接字的连接。完成以下任务,下载并使用您选择 AWS IoT Device SDK 的语言。
要使用 AWS IoT Device SDK 适用于 Java v2(IPC 客户端 V2)
-
下载适用于 Java v2 的AWS IoT Device SDK(v1.6.0 或更高版本)
-
执行以下某种操作以在组件中运行自定义代码:
-
使用以下代码创建 IPC 客户端。
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
// Use client.
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e);
System.exit(1);
}
要使用 AWS IoT Device SDK 适用于 Python v2(IPC 客户端 V2)
-
下载适用于 Python 的AWS IoT Device SDK(v1.9.0 或更高版本)。
-
将 SDK 的安装步骤添加到组件配方中的安装生命周期。
-
创建与 AWS IoT Greengrass 核心 IPC 服务的连接。使用以下代码创建 IPC 客户端。
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
try:
ipc_client = GreengrassCoreIPCClientV2()
# Use IPC client.
except Exception:
print('Exception occurred when using IPC.', file=sys.stderr)
traceback.print_exc()
exit(1)
要构建 C++ 版 AWS IoT Device SDK v2,设备必须具有以下工具:
-
C++ 11 或更高版本
-
CMake 3.1 或更高版本
-
以下编译器之一:
-
GCC 4.8 或更高版本
-
Clang 3.9 或更高版本
-
MSVC 2015 或更高版本
要使用 AWS IoT Device SDK 适用于 C++ v2 的
-
下载适用于 C++ v2 的AWS IoT Device SDK(v1.17.0 或更高版本)。
-
按照自述文件中的安装说明从源代码编译 AWS IoT Device SDK 适用于 C++ v2 的。
-
在 C++ 构建工具中,链接您在上一步中构建的 Greengrass IPC 库 AWS::GreengrassIpc-cpp
。以下CMakeLists.txt
示例将 Greengrass IPC 库链接到您用来构建的项目。 CMake
cmake_minimum_required(VERSION 3.1)
project (greengrassv2_pubsub_subscriber)
file(GLOB MAIN_SRC
"*.h"
"*.cpp"
)
add_executable(${PROJECT_NAME} ${MAIN_SRC})
set_target_properties(${PROJECT_NAME} PROPERTIES
LINKER_LANGUAGE CXX
CXX_STANDARD 11)
find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build)
find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build)
find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build)
target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
-
在组件代码中,创建与 C AWS IoT Greengrass ore IPC 服务的连接以创建 IPC 客户端 () Aws::Greengrass::GreengrassCoreIpcClient
。您必须定义 IPC 连接生命周期处理程序,用于处理 IPC 连接、断开连接和错误事件。以下示例创建了 IPC 客户端和 IPC 连接生命周期处理程序,在 IPC 客户端连接、断开连接和遇到错误时进行打印。
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
// Create the IPC client.
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
// Use the IPC client to create an operation request.
// Activate the operation request.
auto activate = operation.Activate(request, nullptr);
activate.wait();
// Wait for Greengrass Core to respond to the request.
auto responseFuture = operation.GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
// Check the result of the request.
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully published to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to publish to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
return 0;
}
-
要在组件中运行自定义代码,请将代码构建为二进制构件,然后在组件配方中运行二进制构件。将构件的Execute
权限设置为OWNER
,以使 AWS IoT Greengrass 核心软件能够运行二进制工件。
组件配方的 Manifests
部分可能类似于以下示例。
- JSON
-
{
...
"Manifests": [
{
"Lifecycle": {
"Run": "{artifacts:path}/greengrassv2_pubsub_subscriber"
},
"Artifacts": [
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber",
"Permission": {
"Execute": "OWNER"
}
}
]
}
]
}
- YAML
-
...
Manifests:
- Lifecycle:
Run: {artifacts:path}/greengrassv2_pubsub_subscriber
Artifacts:
- URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber
Permission:
Execute: OWNER
要编译 AWS IoT Device SDK 适用于 JavaScript v2 的,以便与 NodeJS 一起使用,设备必须具有以下工具:
-
NodeJS 10.0 或更高版本
-
CMake 3.1 或更高版本
要使用 AWS IoT Device SDK 适用于 JavaScript v2 的(IPC 客户端 V1)
-
下载AWS IoT Device SDK
适用于 JavaScript v2(v 1.12.10 或更高版本)的。
-
按照自述文件中的安装说明从源代码构建 JavaScript v2 版。 AWS IoT Device SDK
-
创建与 AWS IoT Greengrass 核心 IPC 服务的连接。完成以下步骤,以创建 IPC 客户端并建立连接。
-
使用以下代码创建 IPC 客户端。
import * as greengrascoreipc from 'aws-iot-device-sdk-v2';
let client = greengrascoreipc.createClient();
-
使用以下代码,建立从组件到 Greengrass Nucleus 的连接。
await client.connect();
授权组件执行 IPC 操作
要允许您的自定义组件使用某些 IPC 操作,您必须定义授权策略,允许组件对某些资源执行操作。每个授权策略都定义了策略允许的操作列表和资源列表。例如,发布/订阅消息收发 IPC 服务定义了主题资源的发布和订阅操作。您可以使用 *
通配符以允许访问所有操作或所有资源。
您可以使用 accessControl
配置参数定义授权策略。您可以在组件配方中或在部署组件时设置该参数。accessControl
对象将 IPC 服务标识符映射到授权策略列表。您可以为每个 IPC 服务定义多个授权策略来控制访问权限。每个授权策略都有一个策略 ID,该 ID 在所有组件中必须唯一。
要创建唯一的策略 IDs,您可以组合组件名称、IPC 服务名称和计数器。例如,名为的组件com.example.HelloWorld
可以定义两个发布/订阅授权策略,如下所示: IDs
授权策略使用以下格式。此对象是 accessControl
配置参数。
- JSON
-
{
"IPC service identifier
": {
"policyId
": {
"policyDescription": "description
",
"operations": [
"operation1
",
"operation2
"
],
"resources": [
"resource1
",
"resource2
"
]
}
}
}
- YAML
-
IPC service identifier
:
policyId
:
policyDescription: description
operations:
- operation1
- operation2
resources:
- resource1
- resource2
授权策略中的通配符
您可以在 IPC 授权策略的 resources
元素中使用 *
通配符,以允许访问单个授权策略中的多个资源。
-
在所有版本的 Greengrass Nucleus 中,您可以指定单个 *
作为资源,以允许访问所有资源。
-
在 Greengrass Nucleus v2.6.0 及更高版本中,您可以在资源中指定 *
字符,以匹配任何字符组合。例如,您可以指定 factory/1/devices/Thermostat*/status
,以允许访问工厂中所有恒温器设备的状态主题,其中每台设备的名称都以 Thermostat
开头。
在为 AWS IoT Core MQTT IPC 服务定义授权策略时,也可以使用 MQTT 通配符(+
和#
)来匹配多个资源。有关更多信息,请参阅 MQTT IPC 授权策略中的 AWS IoT Core MQTT 通配符。
授权策略中的配方变量
如果你使用 Greengrass nucleus v2.6.0 或更高版本,并且将 Greeng interpolateComponentConfigurationrass nucleus 的配置选项设置为,则可以在授权策略中使用配方变量。true{iot:thingName}当您需要包含核心设备名称的授权策略(例如针对 MQTT 主题或设备影子)时,您可以使用此配方变量为一组核心设备配置单个授权策略。例如,您可以允许组件访问以下适用于影子 IPC 操作的资源。
$aws/things/{iot:thingName}/shadow/
授权策略中的特殊字符
要在授权策略中指定文字 *
或 ?
字符,您必须使用转义序列。以下转义序列指示 AWS IoT Greengrass Core 软件使用字面值而不是字符的特殊含义。例如,*
字符是与任意字符组合匹配的通配符。
文字字符 |
转义序列 |
备注 |
*
|
${*}
|
|
?
|
${?}
|
AWS IoT Greengrass 目前不支持与任何单个字符匹配的? 通配符。
|
$
|
${$}
|
使用此转义序列可匹配包含 ${ 的资源。例如,要匹配名为 ${resourceName} 的资源,您必须指定 ${$}{resourceName} 。否则,要匹配包含 $ 的资源,您可以使用文字 $ ,例如允许访问以 $aws 开头的主题。
|
授权策略示例
您可以参考以下授权策略示例,帮助您为组件配置授权策略。
例 具有授权策略的组件配方示例
以下示例组件配方包括可定义授权策略的 accessControl
对象。此策略授权 com.example.HelloWorld
组件发布到 test/topic
主题。
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.HelloWorld",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that publishes messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.HelloWorld:pubsub:1": {
"policyDescription": "Allows access to publish to test/topic.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"test/topic"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"Run": "java -jar {artifacts:path}/HelloWorld.jar"
}
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.HelloWorld
ComponentVersion: '1.0.0'
ComponentDescription: A component that publishes messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
"com.example.HelloWorld:pubsub:1":
policyDescription: Allows access to publish to test/topic.
operations:
- "aws.greengrass#PublishToTopic"
resources:
- "test/topic"
Manifests:
- Lifecycle:
Run: |-
java -jar {artifacts:path}/HelloWorld.jar
例 使用授权策略的组件配置更新示例
部署中的以下配置更新示例指定使用可定义授权策略的 accessControl
对象来配置组件。此策略授权 com.example.HelloWorld
组件发布到 test/topic
主题。
- Console
-
- 要合并的配置
-
{
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.HelloWorld:pubsub:1": {
"policyDescription": "Allows access to publish to test/topic.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"test/topic"
]
}
}
}
}
- AWS CLI
-
以下命令会创建对核心设备的部署。
aws greengrassv2 create-deployment --cli-input-json file://hello-world-deployment.json
hello-world-deployment.json
文件包含以下 JSON 文档。
{
"targetArn": "arn:aws:iot:us-west-2:123456789012:thing/MyGreengrassCore",
"deploymentName": "Deployment for MyGreengrassCore",
"components": {
"com.example.HelloWorld": {
"componentVersion": "1.0.0",
"configurationUpdate": {
"merge": "{\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"com.example.HelloWorld:pubsub:1\":{\"policyDescription\":\"Allows access to publish to test/topic.\",\"operations\":[\"aws.greengrass#PublishToTopic\"],\"resources\":[\"test/topic\"]}}}}"
}
}
}
}
- Greengrass CLI
-
以下 Greengrass CLI 命令会在核心设备上创建本地部署。
sudo greengrass-cli deployment create \
--recipeDir recipes \
--artifactDir artifacts \
--merge "com.example.HelloWorld=1.0.0" \
--update-config hello-world-configuration.json
hello-world-configuration.json
文件包含以下 JSON 文档。
{
"com.example.HelloWorld": {
"MERGE": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.HelloWorld:pubsub:1": {
"policyDescription": "Allows access to publish to test/topic.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"test/topic"
]
}
}
}
}
}
}
订阅 IPC 事件流
您可以使用 IPC 操作在 Greengrass 核心设备上订阅事件流。要使用订阅操作,请定义订阅处理程序并创建对 IPC 服务的请求。然后,每次核心设备将事件消息流式传输到您的组件时,IPC 客户端都会运行订阅处理程序的函数。
您可以关闭订阅以停止处理事件消息。为此,请在您用于打开订阅的订阅操作对象上调用 closeStream()
(Java)、close()
(Python)或 Close()
(C++)。
C AWS IoT Greengrass ore IPC 服务支持以下订阅操作:
定义订阅处理程序
要定义订阅处理程序,请定义回调函数,处理事件消息、错误和流关闭。如果您使用 IPC 客户端 V1,则必须在类中定义这些函数。如果您使用 IPC 客户端 V2(在 Java 和 Python 的更高版本中可用) SDKs,则无需创建订阅处理程序类即可定义这些函数。
- Java
-
如果您使用 IPC 客户端 V1,则必须实现通用software.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType
>
接口。 StreamEventType
是订阅操作的事件消息的类型。定义以下函数以处理事件消息、错误和流关闭。
如果您使用 IPC 客户端 V2,则可以在订阅处理程序类之外定义这些函数,也可以使用 Lambda 表达式。
void onStreamEvent(StreamEventType
event)
-
IPC 客户端在收到事件消息(例如 MQTT 消息或组件更新通知)时调用的回调。
boolean onStreamError(Throwable error)
-
IPC 客户端在流错误发生时调用的回调。
如果出现错误,则返回 True 以关闭订阅流,或返回 False 以保持流处于打开状态。
void onStreamClosed()
-
IPC 客户端在流关闭时调用的回调。
- Python
-
如果您使用 IPC 客户端 V1,则必须扩展与订阅操作对应的流响应处理程序类。 AWS IoT Device SDK 包括每个订阅操作的订阅处理程序类。 StreamEventType
是订阅操作的事件消息的类型。定义以下函数以处理事件消息、错误和流关闭。
如果您使用 IPC 客户端 V2,则可以在订阅处理程序类之外定义这些函数,也可以使用 Lambda 表达式。
def on_stream_event(self, event:
StreamEventType
) -> None
-
IPC 客户端在收到事件消息(例如 MQTT 消息或组件更新通知)时调用的回调。
def on_stream_error(self, error: Exception) -> bool
-
IPC 客户端在流错误发生时调用的回调。
如果出现错误,则返回 True 以关闭订阅流,或返回 False 以保持流处于打开状态。
def on_stream_closed(self) -> None
-
IPC 客户端在流关闭时调用的回调。
- C++
-
实施从与订阅操作对应的流响应处理程序类派生的类。 AWS IoT Device SDK 包括每个订阅操作的订阅处理程序基类。 StreamEventType
是订阅操作的事件消息的类型。定义以下函数以处理事件消息、错误和流关闭。
void OnStreamEvent(StreamEventType
*event)
-
IPC 客户端在收到事件消息(例如 MQTT 消息或组件更新通知)时调用的回调。
bool OnStreamError(OperationError *error)
-
IPC 客户端在流错误发生时调用的回调。
如果出现错误,则返回 True 以关闭订阅流,或返回 False 以保持流处于打开状态。
void OnStreamClosed()
-
IPC 客户端在流关闭时调用的回调。
- JavaScript
-
实施从与订阅操作对应的流响应处理程序类派生的类。 AWS IoT Device SDK 包括每个订阅操作的订阅处理程序基类。 StreamEventType
是订阅操作的事件消息的类型。定义以下函数以处理事件消息、错误和流关闭。
on(event: 'ended', listener: StreamingOperationEndedListener)
-
IPC 客户端在流关闭时调用的回调。
on(event: 'streamError', listener: StreamingRpcErrorListener)
-
IPC 客户端在流错误发生时调用的回调。
如果出现错误,则返回 True 以关闭订阅流,或返回 False 以保持流处于打开状态。
on(event: 'message', listener: (message: InboundMessageType) => void)
-
IPC 客户端在收到事件消息(例如 MQTT 消息或组件更新通知)时调用的回调。
订阅处理程序示例
以下示例演示如何使用 SubscribeToTopic 操作和订阅处理程序来订阅本地发布/订阅消息。
- Java (IPC client V2)
-
例 示例:订阅本地发布/订阅消息
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.*;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
public class SubscribeToTopicV2 {
public static void main(String[] args) {
String topic = args[0];
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic);
GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse,
SubscribeToTopicResponseHandler> response =
ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent,
Optional.of(SubscribeToTopicV2::onStreamError),
Optional.of(SubscribeToTopicV2::onStreamClosed));
SubscribeToTopicResponseHandler responseHandler = response.getHandler();
System.out.println("Successfully subscribed to topic: " + topic);
// Keep the main thread alive, or the process will exit.
try {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
}
// To stop subscribing, close the stream.
responseHandler.closeStream();
} catch (Exception e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Exception occurred when using IPC.");
}
e.printStackTrace();
System.exit(1);
}
}
public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) {
try {
BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage();
String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8);
String topic = binaryMessage.getContext().getTopic();
System.out.printf("Received new message on topic %s: %s%n", topic, message);
} catch (Exception e) {
System.err.println("Exception occurred while processing subscription response " +
"message.");
e.printStackTrace();
}
}
public static boolean onStreamError(Throwable error) {
System.err.println("Received a stream error.");
error.printStackTrace();
return false; // Return true to close stream, false to keep stream open.
}
public static void onStreamClosed() {
System.out.println("Subscribe to topic stream closed.");
}
}
- Python (IPC client V2)
-
例 示例:订阅本地发布/订阅消息
import sys
import time
import traceback
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import (
SubscriptionResponseMessage,
UnauthorizedError
)
def main():
args = sys.argv[1:]
topic = args[0]
try:
ipc_client = GreengrassCoreIPCClientV2()
# Subscription operations return a tuple with the response and the operation.
_, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event,
on_stream_error=on_stream_error, on_stream_closed=on_stream_closed)
print('Successfully subscribed to topic: ' + topic)
# Keep the main thread alive, or the process will exit.
try:
while True:
time.sleep(10)
except InterruptedError:
print('Subscribe interrupted.')
# To stop subscribing, close the stream.
operation.close()
except UnauthorizedError:
print('Unauthorized error while subscribing to topic: ' +
topic, file=sys.stderr)
traceback.print_exc()
exit(1)
except Exception:
print('Exception occurred', file=sys.stderr)
traceback.print_exc()
exit(1)
def on_stream_event(event: SubscriptionResponseMessage) -> None:
try:
message = str(event.binary_message.message, 'utf-8')
topic = event.binary_message.context.topic
print('Received new message on topic %s: %s' % (topic, message))
except:
traceback.print_exc()
def on_stream_error(error: Exception) -> bool:
print('Received a stream error.', file=sys.stderr)
traceback.print_exc()
return False # Return True to close stream, False to keep stream open.
def on_stream_closed() -> None:
print('Subscribe to topic stream closed.')
if __name__ == '__main__':
main()
- C++
-
例 示例:订阅本地发布/订阅消息
#include <iostream>
#include </crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class SubscribeResponseHandler : public SubscribeToTopicStreamHandler {
public:
virtual ~SubscribeResponseHandler() {}
private:
void OnStreamEvent(SubscriptionResponseMessage *response) override {
auto jsonMessage = response->GetJsonMessage();
if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) {
auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable();
// Handle JSON message.
} else {
auto binaryMessage = response->GetBinaryMessage();
if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) {
auto messageBytes = binaryMessage.value().GetMessage().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
// Handle binary message.
}
}
}
bool OnStreamError(OperationError *error) override {
// Handle error.
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
// Handle close.
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
// Handle connection to IPC service.
}
void OnDisconnectCallback(RpcError error) override {
// Handle disconnection from IPC service.
}
bool OnErrorCallback(RpcError error) override {
// Handle IPC service connection error.
return true;
}
};
int main() {
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
String topic("my/topic");
int timeout = 10;
SubscribeToTopicRequest request;
request.SetTopic(topic);
//SubscribeResponseHandler streamHandler;
auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToTopic(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (!response) {
// Handle error.
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
(void)error;
// Handle operation error.
} else {
// Handle RPC error.
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}
- JavaScript
-
例 示例:订阅本地发布/订阅消息
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc";
import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model";
import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc";
class SubscribeToTopic {
private ipcClient : greengrasscoreipc.Client
private readonly topic : string;
constructor() {
// define your own constructor, e.g.
this.topic = "<define_your_topic>";
this.subscribeToTopic().then(r => console.log("Started workflow"));
}
private async subscribeToTopic() {
try {
this.ipcClient = await getIpcClient();
const subscribeToTopicRequest : SubscribeToTopicRequest = {
topic: this.topic,
}
const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options
streamingOperation.on("message", (message: SubscriptionResponseMessage) => {
// parse the message depending on your use cases, e.g.
if(message.binaryMessage && message.binaryMessage.message) {
const receivedMessage = message.binaryMessage?.message.toString();
}
});
streamingOperation.on("streamError", (error : RpcError) => {
// define your own error handling logic
})
streamingOperation.on("ended", () => {
// define your own logic
})
await streamingOperation.activate();
// Keep the main thread alive, or the process will exit.
await new Promise((resolve) => setTimeout(resolve, 10000))
} catch (e) {
// parse the error depending on your use cases
throw e
}
}
}
export async function getIpcClient(){
try {
const ipcClient = greengrasscoreipc.createClient();
await ipcClient.connect()
.catch(error => {
// parse the error depending on your use cases
throw error;
});
return ipcClient
} catch (err) {
// parse the error depending on your use cases
throw err
}
}
// starting point
const subscribeToTopic = new SubscribeToTopic();
IPC 最佳实践
在自定义组件中使用 IPC 的最佳实践与在 IPC 客户端 V1 和 IPC 客户端 V2 之间有所不同。请遵循适用于您所使用的 IPC 客户端版本的最佳实践。
- IPC client V2
-
IPC 客户端 V2 在单独的线程中运行回调函数,因此与 IPC 客户端 V1 相比,在使用 IPC 和编写订阅处理程序函数时,您需要遵循的准则较少。
- IPC client V1
-
IPC 客户端 V1 使用单线程,与 IPC 服务器通信并调用订阅处理程序。在编写订阅处理程序函数时,您必须考虑这种同步行为。
-
重复使用一个 IPC 客户端
创建 IPC 客户端后,请将其保持打开状态并重复用于所有 IPC 操作。创建多个客户端会消耗额外的资源,并可能导致资源泄漏。
-
异步运行阻止代码
当线程被阻止时,IPC 客户端 V1 无法发送新请求或处理新的事件消息。您应该在从处理程序函数运行的独立线程中运行阻止代码。阻止代码包括 sleep
调用、持续运行的循环,以及需要一段时间才能完成的同步 I/O 请求。
-
异步发送新的 IPC 请求
IPC 客户端 V1 无法从订阅处理程序函数中发送新请求,因为如果您等待响应,该请求会阻止处理程序函数。您应该在从处理程序函数运行的独立线程中发送 IPC 请求。
-
处理异常
IPC 客户端 V1 不处理订阅处理函数中未捕获的异常。如果您的处理函数抛出异常,则订阅关闭,并且该异常不会出现在您的组件日志中。您应该捕获处理程序函数中的异常,以确保订阅保持打开状态并记录代码中发生的错误。