本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
教程:通过以下方式与本地 IoT 设备互动 MQTT
您可以完成本教程,将核心设备配置为与通过该核心设备连接的本地 IoT 设备(称为客户端设备)进行交互MQTT。在本教程中, AWS IoT 您将配置为使用云发现作为客户端设备连接到核心设备。配置云发现时,客户端设备可以向 AWS IoT Greengrass 云服务发送请求以发现核心设备。来自 AWS IoT Greengrass 的响应包括通过配置客户端设备发现的核心设备的连接信息和证书。然后,客户端设备可以使用此信息连接到可用的核心设备,通过该设备进行通信MQTT。
在本教程中,您将执行以下操作:
-
如有必要,请查看和更新核心设备的权限。
-
将客户端设备与核心设备关联,以便客户端设备可以使用云发现发现核心设备。
-
将 Greengrass 组件部署到核心设备以启用客户端设备支持。
-
将客户端设备连接到核心设备并测试与 AWS IoT Core 云服务的通信。
-
开发与客户端设备通信的自定义 Greengrass 组件。
-
开发与客户端设备的 AWS IoT 设备影子交互的自定义组件。
本教程使用单个核心设备和单个客户端设备。您还可以按照教程连接和测试多个客户端设备。
您预计需要花费 30–60 分钟来完成本教程。
先决条件
要完成本教程,您需要:
-
一个 AWS 账户。如果没有,请参阅设置一个 AWS 账户。
-
具有管理员权限的 AWS Identity and Access Management (IAM) 用户。
-
Greengrass 核心设备。有关如何设置核心设备的更多信息,请参阅设置 AWS IoT Greengrass 核心设备。
-
核心设备必须运行 Greengrass nucleus v2.6.0 或更高版本。此版本包括对本地发布/订阅通信中的通配符的支持以及对客户端设备影子的支持。
注意
客户端设备支持需要 Greengrass nucleus v2.2.0 或更高版本。但是,本教程探讨了较新的功能,例如在本地发布/订阅中支持MQTT通配符以及支持客户端设备阴影。这些功能需要 Greengrass nucleus v2.6.0 或更高版本。
-
核心设备必须与客户端设备位于同一个网络上才能连接。
-
(可选)要完成开发自定义 Greengrass 组件的模块,核心设备必须运行 Greengrass。CLI有关更多信息,请参阅 安装 Greengrass CLI。
-
-
在本教程中,可以作为客户端设备进行连接。 AWS IoT 有关更多信息,请参阅《AWS IoT Core 开发人员指南》中的创建 AWS IoT 资源。
-
客户端设备的 AWS IoT 策略必须允许该
greengrass:Discover
权限。有关更多信息,请参阅 客户端设备的最低 AWS IoT 策略。 -
客户端设备必须与核心设备位于同一个网络上。
-
客户端设备必须运行 Python 3
。 -
客户端设备必须运行 Git
。
-
步骤 1:查看并更新核心设备 AWS IoT 政策
要支持客户端设备,核心设备的 AWS IoT 策略必须允许以下权限:
-
greengrass:PutCertificateAuthorities
-
greengrass:VerifyClientDeviceIdentity
-
greengrass:VerifyClientDeviceIoTCertificateAssociation
-
greengrass:GetConnectivityInfo
-
greengrass:UpdateConnectivityInfo
—(可选)使用 IP 检测器组件需要此权限,该组件将核心设备的网络连接信息报告给 AWS IoT Greengrass 云服务。
有关核心设备的这些权限和 AWS IoT 策略的更多信息,请参阅数据面板操作的 AWS IoT 策略和支持客户端设备的最低 AWS IoT 政策。
在本节中,您将查看核心设备的 AWS IoT 策略并添加缺少的所有必需权限。如果您使用AWS IoT Greengrass 核心软件安装程序来配置资源,则您的核心设备具有允许访问所有 AWS IoT Greengrass 操作的 AWS IoT 策略(greengrass:*
)。在这种情况下,只有当您计划将影子管理器组件配置为与其同步设备影子时,才必须更新 AWS IoT 策略 AWS IoT Core。否则,您可以跳过此部分。
查看和更新核心设备的 AWS IoT 政策
-
在 AWS IoT Greengrass 控制台
导航菜单中,选择核心设备。 -
在核心设备页面上,选择要更新的核心设备。
-
在核心设备详细信息页面上,选择指向核心设备的事物的链接。此链接可打开 AWS IoT 控制台中的事物详细信息页面。
-
在“事物详细信息”页面上,选择证书。
-
在证书选项卡中,选择事物的有效证书。
-
在证书详细信息页面上,选择策略。
-
在 “策略” 选项卡中,选择要查看和更新的 AWS IoT 策略。您可以为附加到核心设备有效证书的任何策略添加所需权限。
注意
如果您使用AWS IoT Greengrass 核心软件安装程序来配置资源,则有两个 AWS IoT 策略。我们建议您选择名为的策略 GreengrassV2IoTThingPolicy,如果存在。默认情况下,使用快速安装程序创建的核心设备将使用此策略名称。如果您为此策略添加权限,则也会将这些权限授予使用此策略的其他核心设备。
-
在策略概述中,选择编辑活动版本。
-
查看策略以了解所需权限,然后添加缺失的所有必要权限。
-
要将新的策略版本设置为活动版本,请在策略版本状态下,选择将编辑后的版本设置为该策略的活动版本。
-
选择另存为新版本。
第 2 步:启用客户端设备支持
要使客户端设备使用云发现连接到核心设备,您必须关联这些设备。当您将客户端设备与核心设备关联时,可以让该客户端设备检索核心设备的 IP 地址和证书以用于连接。
要使客户端设备能够安全地连接到核心设备并与 Greengrass 组件通信,请将以下 Greengrass 组件 AWS IoT Core部署到核心设备:
-
客户端设备身份验证 (
aws.greengrass.clientdevices.Auth
)部署客户端设备身份验证组件,以对客户端设备进行身份验证并授权客户端设备操作。这个组件允许你的 AWS IoT 东西连接到核心设备。
此组件需要进行一些配置才能使用。您必须指定客户端设备组以及每个组有权执行的操作,例如连接和通信MQTT。有关更多信息,请参阅客户端设备身份验证组件配置。
-
MQTT 3.1.1 代理(Moquette) (
aws.greengrass.clientdevices.mqtt.Moquette
)部署 Moquette MQTT 经纪商组件来运行轻量级MQTT经纪商。Moquette MQTT 代理符合 MQTT 3.1.1,包括对 QoS 0、QoS 1、QoS 2、保留消息、最后遗嘱消息和永久订阅的本地支持。
您无需配置此组件即可使用此组件。但是,您可以配置此组件操作MQTT代理的端口。默认情况下,使用端口 8883。
-
MQTT 网桥 (
aws.greengrass.clientdevices.mqtt.Bridge
)(可选)部署 MQTT bridge 组件以在客户端设备(本地MQTT)、本地发布/订阅和之间中继消息。 AWS IoT Core MQTT将此组件配置为与 Greengrass 组件中的客户端设备同步 AWS IoT Core 并与客户端设备进行交互。
此组件需要配置才能使用。您必须指定此组件用于中继消息的主题映射。有关更多信息,请参阅 MQTTbridge 组件配置。
-
IP 检测器 (
aws.greengrass.clientdevices.IPDetector
)(可选)部署 IP 检测器组件,自动向 AWS IoT Greengrass 云服务报告核心设备的MQTT代理端点。如果您的网络设置很复杂,例如路由器将MQTT代理端口转发到核心设备的网络设置,则不能使用此组件。
您无需配置此组件即可使用此组件。
在本节中,您将使用 AWS IoT Greengrass 控制台关联客户端设备并将客户端设备组件部署到核心设备。
要启用客户端设备支持,请执行以下操作
-
在左侧导航菜单中,选择核心设备。
-
在核心设备页面上,选择要在其中启用客户端设备支持的核心设备。
-
在核心设备详细信息页面上,选择客户端设备选项卡。
-
在客户端设备选项卡上,选择配置云发现。
配置核心设备发现页面会打开。在此页面上,您可以将客户端设备与核心设备关联并部署客户端设备组件。本页在第 1 步:选择目标核心设备中为您选择核心设备。
注意
您也可以使用此页为事物组配置核心设备发现。如果选择此选项,则可以将客户端设备组件部署到事物组中的所有核心设备。但是,如果选择此选项,则必须在创建部署后手动将客户端设备与每台核心设备相关联。在本教程中,您将配置单个核心设备。
-
在步骤 2:关联客户端设备中,将客户端设备的设备 AWS IoT 与核心设备相关联。这使客户端设备能够使用云发现来检索核心设备的连接信息和证书。执行以下操作:
-
选择关联客户端设备。
-
在 “将客户端设备与核心设备关联” 模式中,输入要关联 AWS IoT 的事物的名称。
-
选择 添加。
-
选择关联 。
-
-
在第 3 步:配置和部署 Greengrass 组件中,部署组件以启用客户端设备支持。如果目标核心设备有以前的部署,则此页面将修改该部署。否则,此页面将为核心设备创建新的部署。执行以下操作以配置和部署客户端设备组件:
-
核心设备必须运行 Greengrass nucleus v2.6.0 或更高版本才能完成本教程。如果核心设备运行较早版本,请执行以下操作:
-
选中要部署的复选框 aws.greengrass.Nucleus组件。
-
对于 aws.greengrass.Nucleus组件,选择编辑配置。
-
对于组件版本,选择版本 2.6.0 或更高版本。
-
选择确认。
注意
如果您将 Greengrass nucleus 从较早的次要版本升级,并且核心设备AWS运行依赖于该核的组件,则还必须将提供的组件更新到较新的版本。 AWS在本教程的后面部分中查看部署时,可以配置这些组件的版本。有关更多信息,请参阅 更新 AWS IoT Greengrass Core 软件(OTA)。
-
-
对于 aws.greengrass.clientdevices.Auth组件,选择编辑配置。
-
在客户端设备身份验证组件的编辑配置模式中,配置授权策略,允许客户端设备在核心设备上发布和订阅MQTT代理。执行以下操作:
-
在配置下的要合并的配置代码块中,输入以下配置,其中包含客户端设备授权策略。每个设备组授权策略都指定了一组操作以及客户端设备可以用来执行这些操作的资源。
-
此策略允许名称
MyClientDevice
以开头的客户端设备就所有MQTT主题进行连接和通信。MyClientDevice*
替换为要作为客户端设备连接 AWS IoT 的事物的名称。您也可以使用*
通配符指定与客户端设备名称相匹配的名称。*
通配符必须位于名称的末尾。如果您要连接第二台客户端设备,请替换为该客户端设备的名称或
MyOtherClientDevice*
与该客户端设备名称匹配的通配符模式。否则,您可以删除或保留选择规则的这一部分,该部分允许名称与MyOtherClientDevice*
匹配的客户端设备进行连接和通信。 -
此策略还使用
OR
操作员来允许名称MyOtherClientDevice
以开头的客户端设备就所有MQTT主题进行连接和通信。您可以删除选择规则中的这一子句,也可以对其进行修改以匹配要连接的客户端设备。 -
此政策允许客户端设备发布和订阅所有MQTT主题。要遵循最佳安全实践,请将
mqtt:publish
和mqtt:subscribe
操作限制在客户端设备用于通信的最少一组话题上。
{ "deviceGroups": { "formatVersion": "2021-03-05", "definitions": { "MyDeviceGroup": { "selectionRule": "thingName:
MyClientDevice*
OR thingName:MyOtherClientDevice*
", "policyName": "MyClientDevicePolicy" } }, "policies": { "MyClientDevicePolicy": { "AllowConnect": { "statementDescription": "Allow client devices to connect.", "operations": [ "mqtt:connect" ], "resources": [ "*" ] }, "AllowPublish": { "statementDescription": "Allow client devices to publish to all topics.", "operations": [ "mqtt:publish" ], "resources": [ "*" ] }, "AllowSubscribe": { "statementDescription": "Allow client devices to subscribe to all topics.", "operations": [ "mqtt:subscribe" ], "resources": [ "*" ] } } } } }有关更多信息,请参阅客户端设备身份验证组件配置。
-
-
选择确认。
-
-
对于 aws.greengrass.clientdevices.mqtt.Bridge组件,选择编辑配置。
-
在 MQTT bridge 组件的编辑配置模式中,配置主题映射,将MQTT消息从客户端设备中继到 AWS IoT Core。执行以下操作:
-
在配置下的要合并的配置代码块中,输入以下配置。此配置指定将
clients/+/hello/world
主题过滤器上的MQTT消息从客户端设备中继到 AWS IoT Core 云服务。例如,此主题筛选器与clients/MyClientDevice1/hello/world
主题匹配。{ "mqttTopicMapping": { "HelloWorldIotCoreMapping": { "topic": "clients/+/hello/world", "source": "LocalMqtt", "target": "IotCore" } } }
有关更多信息,请参阅 MQTTbridge 组件配置。
-
选择确认。
-
-
-
选择查看和部署,以查看此页为您创建的部署。
-
如果您之前未在此区域设置过 Greengrass 服务角色,则控制台会打开一个模式来为您设置服务角色。客户端设备身份验证组件使用此服务角色来验证客户端设备的身份,IP 检测器组件使用此服务角色来管理核心设备连接信息。选择 Grant permissions(授予权限)。
-
在查看页面上,选择部署以开始部署到核心设备。
-
要验证部署是否成功,请检查部署状态并检查核心设备上的日志。要查看核心设备上的部署状态,可以在部署概述中选择目标。有关更多信息,请参阅下列内容:
第 3 步:连接客户端设备
客户端设备可以使用 AWS IoT Device SDK 来发现、连接核心设备并与之通信。客户端设备必须是一个 AWS IoT 东西。有关更多信息,请参阅《AWS IoT Core 开发人员指南》中的创建事物对象。
在本节中,您将安装适用于 Python 的AWS IoT Device SDK v2
注意
AWS IoT Device SDK 还提供其他编程语言版本。本教程使用适用于 Python 的 AWS IoT Device SDK v2,但你可以根据自己的SDKs用例探索另一个版本。有关更多信息,请参阅《AWS IoT Core 开发者指南》SDKs中的AWS IoT 设备。
要将客户端设备连接到核心设备,请执行以下操作
-
下载适用于 Python 的AWS IoT Device SDK v2
并将其安装到要作为客户端设备连接的设备上。 AWS IoT 在客户端设备上,执行以下操作:
-
克隆适用于 Python 的 AWS IoT Device SDK v2 版本库进行下载。
git clone https://github.com/aws/aws-iot-device-sdk-python-v2.git
-
安装适用于 Python 的 AWS IoT Device SDK v2。
python3 -m pip install --user ./aws-iot-device-sdk-python-v2
-
-
在 python 版 AWS IoT Device SDK v2 中切换到示例文件夹。
cd aws-iot-device-sdk-python-v2/samples
-
运行示例 Greengrass Discovery 应用程序。此应用程序需要指定客户端设备事物名称、要使用的MQTT主题和消息以及用于验证和保护连接的证书的参数。以下示例向
clients/
主题发送“Hello World”消息。MyClientDevice1
/hello/world注意
本主题与您配置MQTT网桥以将消息中继到 AWS IoT Core 之前的主题相匹配。
-
MyClientDevice1
替换为客户端设备的事物名称。 -
~/certs/AmazonRootCA1.pem
替换为客户端设备上的 Amazon 根 CA 证书的路径。 -
~/certs/device.pem.crt
替换为客户端设备上设备证书的路径。 -
~/certs/private.pem.key
替换为客户端设备上私钥文件的路径。 -
us-east-1
替换为您的客户端设备和核心设备运行的地 AWS 区。
python3 basic_discovery.py \\ --thing_name
MyClientDevice1
\\ --topic 'clients/MyClientDevice1
/hello/world' \\ --message 'Hello World!' \\ --ca_file~/certs/AmazonRootCA1.pem
\\ --cert~/certs/device.pem.crt
\\ --key~/certs/private.pem.key
\\ --regionus-east-1
\\ --verbosity Warn发现示例应用程序发送消息 10 次并断开连接。它还订阅在其中发布消息的同一主题。如果输出显示应用程序收到了有关该主题的MQTT消息,则客户端设备可以成功地与核心设备通信。
Performing greengrass discovery... awsiot.greengrass_discovery.DiscoverResponse(gg_groups=[awsiot.greengrass_discovery.GGGroup(gg_group_id='greengrassV2-coreDevice-MyGreengrassCore', cores=[awsiot.greengrass_discovery.GGCore(thing_arn='arn:aws:iot:us-east-1:123456789012:thing/MyGreengrassCore', connectivity=[awsiot.greengrass_discovery.ConnectivityInfo(id='203.0.113.0', host_address='203.0.113.0', metadata='', port=8883)])], certificate_authorities=['-----BEGIN CERTIFICATE-----\
MIICiT...EXAMPLE=
\ -----END CERTIFICATE-----\ '])]) Trying core arn:aws:iot:us-east-1:123456789012:thing/MyGreengrassCore at host 203.0.113.0 port 8883 Connected! Published topic clients/MyClientDevice1/hello/world: {"message": "Hello World!", "sequence": 0} Publish received on topic clients/MyClientDevice1/hello/world b'{"message": "Hello World!", "sequence": 0}' Published topic clients/MyClientDevice1/hello/world: {"message": "Hello World!", "sequence": 1} Publish received on topic clients/MyClientDevice1/hello/world b'{"message": "Hello World!", "sequence": 1}'...
Published topic clients/MyClientDevice1/hello/world: {"message": "Hello World!", "sequence": 9} Publish received on topic clients/MyClientDevice1/hello/world b'{"message": "Hello World!", "sequence": 9}'如果应用程序输出的是错误,请参阅 Greengrass 发现问题疑难解答。
您还可以查看核心设备上的 Greengrass 日志,以验证客户端设备是否成功连接和发送消息。有关更多信息,请参阅 监控 AWS IoT Greengrass 日志。
-
-
验证MQTT网桥是否将消息从客户端设备中继到 AWS IoT Core。您可以使用 AWS IoT Core 控制台中的MQTT测试客户端订阅MQTT主题筛选器。执行以下操作:
-
导航至 AWS IoT 控制台
。 -
在左侧导航菜单的 “测试” 下,选择 “MQTT测试客户端”。
-
在订阅主题选项卡上的主题筛选器中,输入
clients/+/hello/world
以订阅来自核心设备的客户端设备消息。 -
选择订阅。
-
再次在客户端设备上运行发布/订阅应用程序。
MQTT测试客户端显示您从客户端设备发送的与该主题过滤器匹配的主题的消息。
-
第 4 步:开发与客户端设备通信的组件
您可以开发与客户端设备通信的 Greengrass 组件。组件使用进程间通信 (IPC) 和本地发布/订阅接口在核心设备上进行通信。要与客户端设备交互,请将MQTT网桥组件配置为在客户端设备和本地发布/订阅接口之间中继消息。
在本节中,您将更新MQTT网桥组件以将来自客户端设备的消息中继到本地发布/订阅接口。然后,您将开发用于订阅这些消息并在收到消息时打印这些消息的组件。
要开发与客户端设备通信的组件,请执行以下操作
-
修改核心设备的部署,并将MQTT网桥组件配置为将来自客户端设备的消息中继到本地发布/订阅。执行以下操作:
-
在左侧导航菜单中,选择核心设备。
-
在核心设备页面上,选择本教程中使用的核心设备。
-
在核心设备详细信息页面上,选择客户端设备选项卡。
-
在客户端设备选项卡上,选择配置云发现。
配置核心设备发现页面会打开。在此页面上,您可以更改或配置部署到核心设备的客户端设备组件。
-
在步骤 3 中,对于 aws.greengrass.clientdevices.mqtt.Bridge组件,选择编辑配置。
-
在 MQTT bridge 组件的编辑配置模式中,配置主题映射,将来自客户端设备的MQTT消息中继到本地发布/订阅接口。执行以下操作:
-
在配置下的要合并的配置代码块中,输入以下配置。此配置指定将与主题过滤器匹配的主题上的MQTT消息从客户端设备中继到 AWS IoT Core 云服务和本地 Greengrass 发布/订阅代理。
clients/+/hello/world
{ "mqttTopicMapping": { "HelloWorldIotCoreMapping": { "topic": "clients/+/hello/world", "source": "LocalMqtt", "target": "IotCore" }, "HelloWorldPubsubMapping": { "topic": "clients/+/hello/world", "source": "LocalMqtt", "target": "Pubsub" } } }
有关更多信息,请参阅 MQTTbridge 组件配置。
-
选择确认。
-
-
选择查看和部署,以查看此页为您创建的部署。
-
在查看页面上,选择部署以开始部署到核心设备。
-
要验证部署是否成功,请检查部署状态并检查核心设备上的日志。要查看核心设备上的部署状态,可以在部署概述中选择目标。有关更多信息,请参阅下列内容:
-
-
开发和部署用于订阅来自客户端设备的“Hello World”消息的 Greengrass 组件。执行以下操作:
-
在核心设备上为配方和构件创建文件夹。
重要
必须为构件文件夹路径使用以下格式。其中包括您在配方中指定的组件名称和版本。
artifacts/
componentName
/componentVersion
/ -
使用文本编辑器创建组件配方,其中包含以下内容。此配方指定安装适用于 Python 的 AWS IoT Device SDK v2 并运行订阅该主题并打印消息的脚本。
例如,在基于 Linux 的系统上,你可以运行以下命令来使用 GNU nano 来创建文件。
nano recipes/com.example.clientdevices.MyHelloWorldSubscriber-1.0.0.json
将以下配方复制到该文件中。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.clientdevices.MyHelloWorldSubscriber", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to Hello World messages from client devices.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.clientdevices.MyHelloWorldSubscriber:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "python3 -m pip install --user awsiotsdk", "Run": "python3 -u {artifacts:path}/hello_world_subscriber.py" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "install": "py -3 -m pip install --user awsiotsdk", "Run": "py -3 -u {artifacts:path}/hello_world_subscriber.py" } } ] }
-
使用文本编辑器创建名为
hello_world_subscriber.py
的 Python 脚本构件,其中包含以下内容。此应用程序使用发布/订阅IPC服务来订阅clients/+/hello/world
主题并打印它收到的消息。例如,在基于 Linux 的系统上,你可以运行以下命令来使用 GNU nano 来创建文件。
nano artifacts/com.example.clientdevices.MyHelloWorldSubscriber/1.0.0/hello_world_subscriber.py
将以下 Python 代码复制到该文件中。
import sys import time import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 CLIENT_DEVICE_HELLO_WORLD_TOPIC = 'clients/+/hello/world' TIMEOUT = 10 def on_hello_world_message(event): try: message = str(event.binary_message.message, 'utf-8') print('Received new message: %s' % message) except: traceback.print_exc() try: ipc_client = GreengrassCoreIPCClientV2() # SubscribeToTopic returns a tuple with the response and the operation. _, operation = ipc_client.subscribe_to_topic( topic=CLIENT_DEVICE_HELLO_WORLD_TOPIC, on_stream_event=on_hello_world_message) print('Successfully subscribed to topic: %s' % CLIENT_DEVICE_HELLO_WORLD_TOPIC) # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') operation.close() except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)
注意
此组件使用适用于 Python 的AWS IoT Device SDK v2 中的IPC客户端 V2
与 AWS IoT Greengrass 核心软件进行通信。与原始IPC客户端相比,IPC客户端 V2 减少了IPC在自定义组件中使用所需编写的代码量。 -
使用 G CLI reengrass 部署组件。
-
-
查看组件日志,以验证组件是否成功安装并订阅了该主题。
您可以保持日志源处于打开状态,以验证核心设备是否收到消息。
-
在客户端设备上,再次运行示例 Greengrass 发现应用程序,以向核心设备发送消息。
python3 basic_discovery.py \\ --thing_name
MyClientDevice1
\\ --topic 'clients/MyClientDevice1
/hello/world' \\ --message 'Hello World!' \\ --ca_file~/certs/AmazonRootCA1.pem
\\ --cert~/certs/device.pem.crt
\\ --key~/certs/private.pem.key
\\ --regionus-east-1
\\ --verbosity Warn -
再次查看组件日志,以验证该组件是否收到并打印了来自客户端设备的消息。
第 5 步:开发与客户端设备影子交互的组件
您可以开发与客户端设备的 AWS IoT 设备影子交互的 Greengrass 组件。影子是存储诸如客户端设备 AWS IoT 之类的当前或所需状态信息的JSON文档。即使客户端设备未连接到 AWS IoT,自定义组件也可以访问客户端设备的影子以管理其状态。每 AWS IoT 件事物都有一个未命名的阴影,你也可以为每个事物创建多个命名的阴影。
在本节中,您将部署影子管理器组件来管理核心设备上的影子。您还可以更新MQTT网桥组件,以便在客户端设备和影子管理器组件之间中继影子消息。然后,开发用于更新客户端设备影子的组件,并在客户端设备上运行示例应用程序,以响应来自该组件的影子更新。该组件代表智能灯管理应用程序,其中核心设备管理作为客户端设备与核心设备连接的智能灯的颜色状态。
要开发与客户端设备影子交互的组件,请执行以下操作
-
修改核心设备的部署以部署影子管理器组件,并将MQTT桥接组件配置为在客户端设备和影子管理器通信的本地发布/订阅之间中继影子消息。执行以下操作:
-
在左侧导航菜单中,选择核心设备。
-
在核心设备页面上,选择本教程中使用的核心设备。
-
在核心设备详细信息页面上,选择客户端设备选项卡。
-
在客户端设备选项卡上,选择配置云发现。
配置核心设备发现页面会打开。在此页面上,您可以更改或配置部署到核心设备的客户端设备组件。
-
在步骤 3 中,对于 aws.greengrass.clientdevices.mqtt.Bridge组件,选择编辑配置。
-
在 MQTT bridge 组件的编辑配置模式中,配置主题映射,以便在客户端设备和本地发布/订阅接口之间中继有关设备影子主题的MQTT消息。您还要确认部署指定了兼容的MQTT网桥版本。客户端设备影子支持需要MQTT网桥 v2.2.0 或更高版本。执行以下操作:
-
对于组件版本,选择版本 2.2.0 或更高版本。
-
在配置下的要合并的配置代码块中,输入以下配置。此配置指定在影子主题上中继MQTT消息。
{ "mqttTopicMapping": { "HelloWorldIotCoreMapping": { "topic": "clients/+/hello/world", "source": "LocalMqtt", "target": "IotCore" }, "HelloWorldPubsubMapping": { "topic": "clients/+/hello/world", "source": "LocalMqtt", "target": "Pubsub" }, "ShadowsLocalMqttToPubsub": { "topic": "$aws/things/+/shadow/#", "source": "LocalMqtt", "target": "Pubsub" }, "ShadowsPubsubToLocalMqtt": { "topic": "$aws/things/+/shadow/#", "source": "Pubsub", "target": "LocalMqtt" } } }
有关更多信息,请参阅 MQTTbridge 组件配置。
-
选择确认。
-
-
在步骤 3 中,选择 aws.greengrass.ShadowManager用于部署它的组件。
-
选择查看和部署,以查看此页为您创建的部署。
-
在查看页面上,选择部署以开始部署到核心设备。
-
要验证部署是否成功,请检查部署状态并检查核心设备上的日志。要查看核心设备上的部署状态,可以在部署概述中选择目标。有关更多信息,请参阅下列内容:
-
-
开发和部署用于管理智能灯客户端设备的 Greengrass 组件。执行以下操作:
-
在核心设备上为组件的构件创建文件夹。
重要
必须为构件文件夹路径使用以下格式。其中包括您在配方中指定的组件名称和版本。
artifacts/
componentName
/componentVersion
/ -
使用文本编辑器创建组件配方,其中包含以下内容。此配方指定安装适用于 Python 的 AWS IoT Device SDK v2 并运行一个脚本,该脚本与智能灯光客户端设备的阴影交互以管理其颜色。
例如,在基于 Linux 的系统上,你可以运行以下命令来使用 GNU nano 来创建文件。
nano recipes/com.example.clientdevices.MySmartLightManager-1.0.0.json
将以下配方复制到该文件中。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.clientdevices.MySmartLightManager", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that interacts with smart light client devices.", "ComponentPublisher": "Amazon", "ComponentDependencies": { "aws.greengrass.Nucleus": { "VersionRequirement": "^2.6.0" }, "aws.greengrass.ShadowManager": { "VersionRequirement": "^2.2.0" }, "aws.greengrass.clientdevices.mqtt.Bridge": { "VersionRequirement": "^2.2.0" } }, "ComponentConfiguration": { "DefaultConfiguration": { "smartLightDeviceNames": [], "accessControl": { "aws.greengrass.ShadowManager": { "com.example.clientdevices.MySmartLightManager:shadow:1": { "policyDescription": "Allows access to client devices' unnamed shadows", "operations": [ "aws.greengrass#GetThingShadow", "aws.greengrass#UpdateThingShadow" ], "resources": [ "$aws/things/MyClientDevice*/shadow" ] } }, "aws.greengrass.ipc.pubsub": { "com.example.clientdevices.MySmartLightManager:pubsub:1": { "policyDescription": "Allows access to client devices' unnamed shadow updates", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "$aws/things/+/shadow/update/accepted" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "python3 -m pip install --user awsiotsdk", "Run": "python3 -u {artifacts:path}/smart_light_manager.py" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "install": "py -3 -m pip install --user awsiotsdk", "Run": "py -3 -u {artifacts:path}/smart_light_manager.py" } } ] }
-
使用文本编辑器创建名为
smart_light_manager.py
的 Python 脚本构件,其中包含以下内容。此应用程序使用影子IPC服务来获取和更新客户端设备影子,使用本地发布/订阅IPC服务来接收报告的影子更新。例如,在基于 Linux 的系统上,你可以运行以下命令来使用 GNU nano 来创建文件。
nano artifacts/com.example.clientdevices.MySmartLightManager/1.0.0/smart_light_manager.py
将以下 Python 代码复制到该文件中。
import json import random import sys import time import traceback from uuid import uuid4 from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ResourceNotFoundError SHADOW_COLOR_PROPERTY = 'color' CONFIGURATION_CLIENT_DEVICE_NAMES = 'smartLightDeviceNames' COLORS = ['red', 'orange', 'yellow', 'green', 'blue', 'purple'] SHADOW_UPDATE_TOPIC = '$aws/things/+/shadow/update/accepted' SET_COLOR_INTERVAL = 15 class SmartLightDevice(): def __init__(self, client_device_name: str, reported_color: str = None): self.name = client_device_name self.reported_color = reported_color self.desired_color = None class SmartLightDeviceManager(): def __init__(self, ipc_client: GreengrassCoreIPCClientV2): self.ipc_client = ipc_client self.devices = {} self.client_tokens = set() self.shadow_update_accepted_subscription_operation = None self.client_device_names_configuration_subscription_operation = None self.update_smart_light_device_list() def update_smart_light_device_list(self): # Update the device list from the component configuration. response = self.ipc_client.get_configuration( key_path=[CONFIGURATION_CLIENT_DEVICE_NAMES]) # Identify the difference between the configuration and the currently tracked devices. current_device_names = self.devices.keys() updated_device_names = response.value[CONFIGURATION_CLIENT_DEVICE_NAMES] added_device_names = set(updated_device_names) - set(current_device_names) removed_device_names = set(current_device_names) - set(updated_device_names) # Stop tracking any smart light devices that are no longer in the configuration. for name in removed_device_names: print('Removing %s from smart light device manager' % name) self.devices.pop(name) # Start tracking any new smart light devices that are in the configuration. for name in added_device_names: print('Adding %s to smart light device manager' % name) device = SmartLightDevice(name) device.reported_color = self.get_device_reported_color(device) self.devices[name] = device print('Current color for %s is %s' % (name, device.reported_color)) def get_device_reported_color(self, smart_light_device): try: response = self.ipc_client.get_thing_shadow( thing_name=smart_light_device.name, shadow_name='') shadow = json.loads(str(response.payload, 'utf-8')) if 'reported' in shadow['state']: return shadow['state']['reported'].get(SHADOW_COLOR_PROPERTY) return None except ResourceNotFoundError: return None def request_device_color_change(self, smart_light_device, color): # Generate and track a client token for the request. client_token = str(uuid4()) self.client_tokens.add(client_token) # Create a shadow payload, which must be a blob. payload_json = { 'state': { 'desired': { SHADOW_COLOR_PROPERTY: color } }, 'clientToken': client_token } payload = bytes(json.dumps(payload_json), 'utf-8') self.ipc_client.update_thing_shadow( thing_name=smart_light_device.name, shadow_name='', payload=payload) smart_light_device.desired_color = color def subscribe_to_shadow_update_accepted_events(self): if self.shadow_update_accepted_subscription_operation == None: # SubscribeToTopic returns a tuple with the response and the operation. _, self.shadow_update_accepted_subscription_operation = self.ipc_client.subscribe_to_topic( topic=SHADOW_UPDATE_TOPIC, on_stream_event=self.on_shadow_update_accepted_event) print('Successfully subscribed to shadow update accepted topic') def close_shadow_update_accepted_subscription(self): if self.shadow_update_accepted_subscription_operation is not None: self.shadow_update_accepted_subscription_operation.close() def on_shadow_update_accepted_event(self, event): try: message = str(event.binary_message.message, 'utf-8') accepted_payload = json.loads(message) # Check for reported states from smart light devices and ignore desired states from components. if 'reported' in accepted_payload['state']: # Process this update only if it uses a client token created by this component. client_token = accepted_payload.get('clientToken') if client_token is not None and client_token in self.client_tokens: self.client_tokens.remove(client_token) shadow_state = accepted_payload['state']['reported'] if SHADOW_COLOR_PROPERTY in shadow_state: reported_color = shadow_state[SHADOW_COLOR_PROPERTY] topic = event.binary_message.context.topic client_device_name = topic.split('/')[2] if client_device_name in self.devices: # Set the reported color for the smart light device. self.devices[client_device_name].reported_color = reported_color print( 'Received shadow update confirmation from client device: %s' % client_device_name) else: print("Shadow update doesn't specify color") except: traceback.print_exc() def subscribe_to_client_device_name_configuration_updates(self): if self.client_device_names_configuration_subscription_operation == None: # SubscribeToConfigurationUpdate returns a tuple with the response and the operation. _, self.client_device_names_configuration_subscription_operation = self.ipc_client.subscribe_to_configuration_update( key_path=[CONFIGURATION_CLIENT_DEVICE_NAMES], on_stream_event=self.on_client_device_names_configuration_update_event) print( 'Successfully subscribed to configuration updates for smart light device names') def close_client_device_names_configuration_subscription(self): if self.client_device_names_configuration_subscription_operation is not None: self.client_device_names_configuration_subscription_operation.close() def on_client_device_names_configuration_update_event(self, event): try: if CONFIGURATION_CLIENT_DEVICE_NAMES in event.configuration_update_event.key_path: print('Received configuration update for list of client devices') self.update_smart_light_device_list() except: traceback.print_exc() def choose_random_color(): return random.choice(COLORS) def main(): try: # Create an IPC client and a smart light device manager. ipc_client = GreengrassCoreIPCClientV2() smart_light_manager = SmartLightDeviceManager(ipc_client) smart_light_manager.subscribe_to_shadow_update_accepted_events() smart_light_manager.subscribe_to_client_device_name_configuration_updates() try: # Keep the main thread alive, or the process will exit. while True: # Set each smart light device to a random color at a regular interval. for device_name in smart_light_manager.devices: device = smart_light_manager.devices[device_name] desired_color = choose_random_color() print('Chose random color (%s) for %s' % (desired_color, device_name)) if desired_color == device.desired_color: print('Desired color for %s is already %s' % (device_name, desired_color)) elif desired_color == device.reported_color: print('Reported color for %s is already %s' % (device_name, desired_color)) else: smart_light_manager.request_device_color_change( device, desired_color) print('Requested color change for %s to %s' % (device_name, desired_color)) time.sleep(SET_COLOR_INTERVAL) except InterruptedError: print('Application interrupted') smart_light_manager.close_shadow_update_accepted_subscription() smart_light_manager.close_client_device_names_configuration_subscription() except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) if __name__ == '__main__': main()
该 Python 应用程序执行以下操作:
-
读取组件的配置以获取要管理的智能灯客户端设备列表。
-
使用SubscribeToConfigurationUpdateIPC操作订阅配置更新通知。每次组件的配置发生变化时,C AWS IoT Greengrass ore 软件都会发送通知。当组件收到配置更新通知时,组件会更新其管理的智能灯客户端设备列表。
-
获取每台智能灯客户端设备的影子以获取其初始颜色状态。
-
将每台智能灯客户端设备的颜色设置为每 15 秒随机颜色一次。该组件更新客户端设备的事物影子以更改其颜色。此操作通过向客户端设备发送阴影增量事件MQTT。
-
使用操作在本地发布/订阅界面上订阅影子更新已接受的消息。SubscribeToTopic IPC该组件接收这些消息以跟踪每台智能灯客户端设备的颜色。当智能轻型客户端设备收到影子更新时,它会发送一条MQTT消息以确认已收到更新。MQTT网桥将此消息中继到本地发布/订阅接口。
-
-
使用 G CLI reengrass 部署组件。部署此组件时,需要指定客户端设备
smartLightDeviceNames
的列表,它将管理这些设备的影子。MyClientDevice1
替换为客户端设备的事物名称。
-
-
查看组件日志,以验证组件是否成功安装和运行。
该组件发送请求以更改智能灯客户端设备的颜色。影子管理器接收请求并设置影子的
desired
状态。但是,智能灯客户端设备尚未运行,因此影子的reported
状态不会改变。该组件的日志包含以下消息。2022-07-07T03:49:24.908Z [INFO] (Copier) com.example.clientdevices.MySmartLightManager: stdout. Chose random color (blue) for MyClientDevice1. {scriptName=services.com.example.clientdevices.MySmartLightManager.lifecycle.Run, serviceName=com.example.clientdevices.MySmartLightManager, currentState=RUNNING} 2022-07-07T03:49:24.912Z [INFO] (Copier) com.example.clientdevices.MySmartLightManager: stdout. Requested color change for MyClientDevice1 to blue. {scriptName=services.com.example.clientdevices.MySmartLightManager.lifecycle.Run, serviceName=com.example.clientdevices.MySmartLightManager, currentState=RUNNING}
您可以将日志源保持打开状态,以查看组件何时打印消息。
-
下载并运行使用 Greengrass 发现功能并订阅设备影子更新的示例应用程序。在客户端设备上,执行以下操作:
-
在 python 版 AWS IoT Device SDK v2 中切换到示例文件夹。此示例应用程序使用示例文件夹中的命令行解析模块。
cd aws-iot-device-sdk-python-v2/samples
-
使用文本编辑器创建名为
basic_discovery_shadow.py
的 Python 脚本,其中包含以下内容。此应用程序使用 Greengrass 发现和影子来保持客户端设备和核心设备之间的属性同步。例如,在基于 Linux 的系统上,你可以运行以下命令来使用 GNU nano 来创建文件。
nano basic_discovery_shadow.py
将以下 Python 代码复制到该文件中。
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0. from awscrt import io from awscrt import mqtt from awsiot import iotshadow from awsiot.greengrass_discovery import DiscoveryClient from awsiot import mqtt_connection_builder from concurrent.futures import Future import sys import threading import traceback from uuid import uuid4 # Parse arguments import utils.command_line_utils; cmdUtils = utils.command_line_utils.CommandLineUtils("Basic Discovery - Greengrass discovery example with device shadows.") cmdUtils.add_common_mqtt_commands() cmdUtils.add_common_topic_message_commands() cmdUtils.add_common_logging_commands() cmdUtils.register_command("key", "<path>", "Path to your key in PEM format.", True, str) cmdUtils.register_command("cert", "<path>", "Path to your client certificate in PEM format.", True, str) cmdUtils.remove_command("endpoint") cmdUtils.register_command("thing_name", "<str>", "The name assigned to your IoT Thing", required=True) cmdUtils.register_command("region", "<str>", "The region to connect through.", required=True) cmdUtils.register_command("shadow_property", "<str>", "The name of the shadow property you want to change (optional, default='color'", default="color") # Needs to be called so the command utils parse the commands cmdUtils.get_args() # Using globals to simplify sample code is_sample_done = threading.Event() mqtt_connection = None shadow_thing_name = cmdUtils.get_command_required("thing_name") shadow_property = cmdUtils.get_command("shadow_property") SHADOW_VALUE_DEFAULT = "off" class LockedData: def __init__(self): self.lock = threading.Lock() self.shadow_value = None self.disconnect_called = False self.request_tokens = set() locked_data = LockedData() def on_connection_interupted(connection, error, **kwargs): print('connection interrupted with error {}'.format(error)) def on_connection_resumed(connection, return_code, session_present, **kwargs): print('connection resumed with return code {}, session present {}'.format(return_code, session_present)) # Try IoT endpoints until we find one that works def try_iot_endpoints(): for gg_group in discover_response.gg_groups: for gg_core in gg_group.cores: for connectivity_info in gg_core.connectivity: try: print('Trying core {} at host {} port {}'.format(gg_core.thing_arn, connectivity_info.host_address, connectivity_info.port)) mqtt_connection = mqtt_connection_builder.mtls_from_path( endpoint=connectivity_info.host_address, port=connectivity_info.port, cert_filepath=cmdUtils.get_command_required("cert"), pri_key_filepath=cmdUtils.get_command_required("key"), ca_bytes=gg_group.certificate_authorities[0].encode('utf-8'), on_connection_interrupted=on_connection_interupted, on_connection_resumed=on_connection_resumed, client_id=cmdUtils.get_command_required("thing_name"), clean_session=False, keep_alive_secs=30) connect_future = mqtt_connection.connect() connect_future.result() print('Connected!') return mqtt_connection except Exception as e: print('Connection failed with exception {}'.format(e)) continue exit('All connection attempts failed') # Function for gracefully quitting this sample def exit(msg_or_exception): if isinstance(msg_or_exception, Exception): print("Exiting sample due to exception.") traceback.print_exception(msg_or_exception.__class__, msg_or_exception, sys.exc_info()[2]) else: print("Exiting sample:", msg_or_exception) with locked_data.lock: if not locked_data.disconnect_called: print("Disconnecting...") locked_data.disconnect_called = True future = mqtt_connection.disconnect() future.add_done_callback(on_disconnected) def on_disconnected(disconnect_future): # type: (Future) -> None print("Disconnected.") # Signal that sample is finished is_sample_done.set() def on_get_shadow_accepted(response): # type: (iotshadow.GetShadowResponse) -> None try: with locked_data.lock: # check that this is a response to a request from this session try: locked_data.request_tokens.remove(response.client_token) except KeyError: return print("Finished getting initial shadow state.") if locked_data.shadow_value is not None: print(" Ignoring initial query because a delta event has already been received.") return if response.state: if response.state.delta: value = response.state.delta.get(shadow_property) if value: print(" Shadow contains delta value '{}'.".format(value)) change_shadow_value(value) return if response.state.reported: value = response.state.reported.get(shadow_property) if value: print(" Shadow contains reported value '{}'.".format(value)) set_local_value_due_to_initial_query(response.state.reported[shadow_property]) return print(" Shadow document lacks '{}' property. Setting defaults...".format(shadow_property)) change_shadow_value(SHADOW_VALUE_DEFAULT) return except Exception as e: exit(e) def on_get_shadow_rejected(error): # type: (iotshadow.ErrorResponse) -> None try: # check that this is a response to a request from this session with locked_data.lock: try: locked_data.request_tokens.remove(error.client_token) except KeyError: return if error.code == 404: print("Thing has no shadow document. Creating with defaults...") change_shadow_value(SHADOW_VALUE_DEFAULT) else: exit("Get request was rejected. code:{} message:'{}'".format( error.code, error.message)) except Exception as e: exit(e) def on_shadow_delta_updated(delta): # type: (iotshadow.ShadowDeltaUpdatedEvent) -> None try: print("Received shadow delta event.") if delta.state and (shadow_property in delta.state): value = delta.state[shadow_property] if value is None: print(" Delta reports that '{}' was deleted. Resetting defaults...".format(shadow_property)) change_shadow_value(SHADOW_VALUE_DEFAULT) return else: print(" Delta reports that desired value is '{}'. Changing local value...".format(value)) if (delta.client_token is not None): print (" ClientToken is: " + delta.client_token) change_shadow_value(value, delta.client_token) else: print(" Delta did not report a change in '{}'".format(shadow_property)) except Exception as e: exit(e) def on_publish_update_shadow(future): #type: (Future) -> None try: future.result() print("Update request published.") except Exception as e: print("Failed to publish update request.") exit(e) def on_update_shadow_accepted(response): # type: (iotshadow.UpdateShadowResponse) -> None try: # check that this is a response to a request from this session with locked_data.lock: try: locked_data.request_tokens.remove(response.client_token) except KeyError: return try: if response.state.reported != None: if shadow_property in response.state.reported: print("Finished updating reported shadow value to '{}'.".format(response.state.reported[shadow_property])) # type: ignore else: print ("Could not find shadow property with name: '{}'.".format(shadow_property)) # type: ignore else: print("Shadow states cleared.") # when the shadow states are cleared, reported and desired are set to None except: exit("Updated shadow is missing the target property") except Exception as e: exit(e) def on_update_shadow_rejected(error): # type: (iotshadow.ErrorResponse) -> None try: # check that this is a response to a request from this session with locked_data.lock: try: locked_data.request_tokens.remove(error.client_token) except KeyError: return exit("Update request was rejected. code:{} message:'{}'".format( error.code, error.message)) except Exception as e: exit(e) def set_local_value_due_to_initial_query(reported_value): with locked_data.lock: locked_data.shadow_value = reported_value def change_shadow_value(value, token=None): with locked_data.lock: if locked_data.shadow_value == value: print("Local value is already '{}'.".format(value)) return print("Changed local shadow value to '{}'.".format(value)) locked_data.shadow_value = value print("Updating reported shadow value to '{}'...".format(value)) reuse_token = token is not None # use a unique token so we can correlate this "request" message to # any "response" messages received on the /accepted and /rejected topics if not reuse_token: token = str(uuid4()) # if the value is "clear shadow" then send a UpdateShadowRequest with None # for both reported and desired to clear the shadow document completely. if value == "clear_shadow": tmp_state = iotshadow.ShadowState(reported=None, desired=None, reported_is_nullable=True, desired_is_nullable=True) request = iotshadow.UpdateShadowRequest( thing_name=shadow_thing_name, state=tmp_state, client_token=token, ) # Otherwise, send a normal update request else: # if the value is "none" then set it to a Python none object to # clear the individual shadow property if value == "none": value = None request = iotshadow.UpdateShadowRequest( thing_name=shadow_thing_name, state=iotshadow.ShadowState( reported={ shadow_property: value } ), client_token=token, ) future = shadow_client.publish_update_shadow(request, mqtt.QoS.AT_LEAST_ONCE) if not reuse_token: locked_data.request_tokens.add(token) future.add_done_callback(on_publish_update_shadow) if __name__ == '__main__': tls_options = io.TlsContextOptions.create_client_with_mtls_from_path(cmdUtils.get_command_required("cert"), cmdUtils.get_command_required("key")) if cmdUtils.get_command(cmdUtils.m_cmd_ca_file): tls_options.override_default_trust_store_from_path(None, cmdUtils.get_command(cmdUtils.m_cmd_ca_file)) tls_context = io.ClientTlsContext(tls_options) socket_options = io.SocketOptions() print('Performing greengrass discovery...') discovery_client = DiscoveryClient(io.ClientBootstrap.get_or_create_static_default(), socket_options, tls_context, cmdUtils.get_command_required("region")) resp_future = discovery_client.discover(cmdUtils.get_command_required("thing_name")) discover_response = resp_future.result() print(discover_response) if cmdUtils.get_command("print_discover_resp_only"): exit(0) mqtt_connection = try_iot_endpoints() shadow_client = iotshadow.IotShadowClient(mqtt_connection) try: # Subscribe to necessary topics. # Note that is **is** important to wait for "accepted/rejected" subscriptions # to succeed before publishing the corresponding "request". print("Subscribing to Update responses...") update_accepted_subscribed_future, _ = shadow_client.subscribe_to_update_shadow_accepted( request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=shadow_thing_name), qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_update_shadow_accepted) update_rejected_subscribed_future, _ = shadow_client.subscribe_to_update_shadow_rejected( request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=shadow_thing_name), qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_update_shadow_rejected) # Wait for subscriptions to succeed update_accepted_subscribed_future.result() update_rejected_subscribed_future.result() print("Subscribing to Get responses...") get_accepted_subscribed_future, _ = shadow_client.subscribe_to_get_shadow_accepted( request=iotshadow.GetShadowSubscriptionRequest(thing_name=shadow_thing_name), qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_get_shadow_accepted) get_rejected_subscribed_future, _ = shadow_client.subscribe_to_get_shadow_rejected( request=iotshadow.GetShadowSubscriptionRequest(thing_name=shadow_thing_name), qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_get_shadow_rejected) # Wait for subscriptions to succeed get_accepted_subscribed_future.result() get_rejected_subscribed_future.result() print("Subscribing to Delta events...") delta_subscribed_future, _ = shadow_client.subscribe_to_shadow_delta_updated_events( request=iotshadow.ShadowDeltaUpdatedSubscriptionRequest(thing_name=shadow_thing_name), qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_shadow_delta_updated) # Wait for subscription to succeed delta_subscribed_future.result() # The rest of the sample runs asynchronously. # Issue request for shadow's current state. # The response will be received by the on_get_accepted() callback print("Requesting current shadow state...") with locked_data.lock: # use a unique token so we can correlate this "request" message to # any "response" messages received on the /accepted and /rejected topics token = str(uuid4()) publish_get_future = shadow_client.publish_get_shadow( request=iotshadow.GetShadowRequest(thing_name=shadow_thing_name, client_token=token), qos=mqtt.QoS.AT_LEAST_ONCE) locked_data.request_tokens.add(token) # Ensure that publish succeeds publish_get_future.result() except Exception as e: exit(e) # Wait for the sample to finish (user types 'quit', or an error occurs) is_sample_done.wait()
该 Python 应用程序执行以下操作:
-
使用 Greengrass 发现功能来发现并连接到核心设备。
-
从核心设备请求影子文档以获取属性的初始状态。
-
订阅影子增量事件,当属性的
desired
值与其reported
值不同时,核心设备会发送这些事件。当应用程序收到影子增量事件时,它会更改属性的值并向核心设备发送更新以将新值设置为其reported
值。
此应用程序结合了 Greengrass 发现和 AWS IoT Device SDK v2 中的影子采样。
-
-
运行示例应用程序。此应用程序需要指定客户端设备事物名称、要使用的影子属性以及用于验证和保护连接的证书的参数。
-
MyClientDevice1
替换为客户端设备的事物名称。 -
~/certs/AmazonRootCA1.pem
替换为客户端设备上的 Amazon 根 CA 证书的路径。 -
~/certs/device.pem.crt
替换为客户端设备上设备证书的路径。 -
~/certs/private.pem.key
替换为客户端设备上私钥文件的路径。 -
us-east-1
替换为您的客户端设备和核心设备运行的地 AWS 区。
python3 basic_discovery_shadow.py \ --thing_name
MyClientDevice1
\ --shadow_property color \ --ca_file~/certs/AmazonRootCA1.pem
\ --cert~/certs/device.pem.crt
\ --key~/certs/private.pem.key
\ --regionus-east-1
\ --verbosity Warn示例应用程序订阅影子主题并等待接收来自核心设备的影子增量事件。如果输出表明应用程序接收并响应影子增量事件,则客户端设备可以成功地与其在核心设备上的影子进行交互。
Performing greengrass discovery... awsiot.greengrass_discovery.DiscoverResponse(gg_groups=[awsiot.greengrass_discovery.GGGroup(gg_group_id='greengrassV2-coreDevice-MyGreengrassCore', cores=[awsiot.greengrass_discovery.GGCore(thing_arn='arn:aws:iot:us-east-1:123456789012:thing/MyGreengrassCore', connectivity=[awsiot.greengrass_discovery.ConnectivityInfo(id='203.0.113.0', host_address='203.0.113.0', metadata='', port=8883)])], certificate_authorities=['-----BEGIN CERTIFICATE-----\n
MIICiT...EXAMPLE=
\n-----END CERTIFICATE-----\n'])]) Trying core arn:aws:iot:us-east-1:123456789012:thing/MyGreengrassCore at host 203.0.113.0 port 8883 Connected! Subscribing to Update responses... Subscribing to Get responses... Subscribing to Delta events... Requesting current shadow state... Received shadow delta event. Delta reports that desired value is 'purple'. Changing local value... ClientToken is: 3dce4d3f-e336-41ac-aa4f-7882725f0033 Changed local shadow value to 'purple'. Updating reported shadow value to 'purple'... Update request published.如果应用程序输出的是错误,请参阅 Greengrass 发现问题疑难解答。
您还可以查看核心设备上的 Greengrass 日志,以验证客户端设备是否成功连接和发送消息。有关更多信息,请参阅 监控 AWS IoT Greengrass 日志。
-
-
-
再次查看组件日志,以验证该组件是否收到来自智能灯客户端设备的影子更新确认。
该组件记录消息以确认智能灯客户端设备是否已更改其颜色。
2022-07-07T03:49:24.908Z [INFO] (Copier) com.example.clientdevices.MySmartLightManager: stdout. Chose random color (blue) for MyClientDevice1. {scriptName=services.com.example.clientdevices.MySmartLightManager.lifecycle.Run, serviceName=com.example.clientdevices.MySmartLightManager, currentState=RUNNING} 2022-07-07T03:49:24.912Z [INFO] (Copier) com.example.clientdevices.MySmartLightManager: stdout. Requested color change for MyClientDevice1 to blue. {scriptName=services.com.example.clientdevices.MySmartLightManager.lifecycle.Run, serviceName=com.example.clientdevices.MySmartLightManager, currentState=RUNNING} 2022-07-07T03:49:24.959Z [INFO] (Copier) com.example.clientdevices.MySmartLightManager: stdout. Received shadow update confirmation from client device: MyClientDevice1. {scriptName=services.com.example.clientdevices.MySmartLightManager.lifecycle.Run, serviceName=com.example.clientdevices.MySmartLightManager, currentState=RUNNING}
注意
客户端设备的影子在核心设备和客户端设备之间同步。但是,核心设备不会将客户端设备的影子与同步 AWS IoT Core。例如,您可以将影子与同步, AWS IoT Core 以查看或修改队列中所有设备的状态。有关如何配置阴影管理器组件以与之同步阴影的更多信息 AWS IoT Core,请参阅将本地设备影子与 AWS IoT Core 同步。
您已完成本教程。客户端设备连接到核心设备,向 AWS IoT Core 和 Greengrass 组件发送MQTT消息,并从核心设备接收影子更新。有关本教程中涵盖的主题的更多信息,请参阅以下内容: