本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
AWS IoT Core MQTT 訊息 IPC 服務可讓您在 之間傳送和接收 MQTT 訊息 AWS IoT Core。元件可以將訊息發佈到 AWS IoT Core 並訂閱主題,以對來自其他來源的 MQTT 訊息採取行動。如需 MQTT AWS IoT Core 實作的詳細資訊,請參閱《 AWS IoT Core 開發人員指南》中的 MQTT。
注意
此 MQTT 訊息 IPC 服務可讓您與 交換訊息 AWS IoT Core。如需如何在元件之間交換訊息的詳細資訊,請參閱發佈/訂閱本機訊息。
最低 SDK 版本
下表列出 AWS IoT Device SDK 您必須用來發佈和訂閱 MQTT 訊息的最小版本 AWS IoT Core。
SDK | 最低版本 |
---|---|
v1.2.10 |
|
1.5.3 版 |
|
1.17.0 版 |
|
v1.12.0 |
授權
若要在自訂元件中使用 AWS IoT Core MQTT 訊息,您必須定義授權政策,允許元件傳送和接收主題的訊息。如需定義授權政策的資訊,請參閱 授權元件來執行 IPC 操作。
AWS IoT Core MQTT 訊息的授權政策具有下列屬性。
IPC 服務識別符: aws.greengrass.ipc.mqttproxy
作業 | 描述 | 資源 |
---|---|---|
|
允許元件在您指定的 MQTT 主題 AWS IoT Core 上發佈訊息至 。 |
主題字串,例如 |
|
允許元件訂閱您指定主題 AWS IoT Core 上的訊息。 |
主題字串,例如 |
|
允許元件發佈和訂閱您指定主題的 AWS IoT Core MQTT 訊息。 |
主題字串,例如 |
MQTT 授權政策中的 AWS IoT Core MQTT 萬用字元
您可以在 MQTT IPC 授權政策中使用 AWS IoT Core MQTT 萬用字元。元件可以發佈和訂閱符合您在授權政策中允許的主題篩選條件的主題。例如,如果元件的授權政策授予對 的存取權test/topic/#
,則該元件可以訂閱 test/topic/#
,而且可以發佈和訂閱 test/topic/filter
。
AWS IoT Core MQTT 授權政策中的配方變數
如果您使用 Greengrass 核的 v2.6.0 或更新版本,您可以在授權政策中使用{iot:thingName}
配方變數。此功能可讓您為一組核心裝置設定單一授權政策,其中每個核心裝置只能存取包含自己的名稱的主題。例如,您可以允許元件存取下列主題資源。
devices/{iot:thingName}/messages
如需詳細資訊,請參閱 配方變數 和 在合併更新中使用配方變數。
授權政策範例
您可以參考下列授權政策範例,協助您設定元件的授權政策。
範例 具有無限制存取的授權政策範例
下列範例授權政策允許元件發佈和訂閱所有主題。
{
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.MyIoTCorePubSubComponent
:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to all topics.",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"*"
]
}
}
}
}
範例 具有有限存取權的授權政策範例
下列範例授權政策允許元件發佈和訂閱兩個名為 factory/1/events
和 的主題factory/1/actions
。
{
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.MyIoTCorePubSubComponent
:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to factory 1 topics.",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"factory/1/actions",
"factory/1/events"
]
}
}
}
}
範例 一組核心裝置的授權政策範例
重要
此範例使用適用於 Greengrass 核元件 v2.6.0 和更新版本的功能。Greengrass nucleus 2.6.0 版新增了對元件組態中大多數配方變數的支援{iot:thingName}
,例如 。
下列範例授權政策允許元件發佈和訂閱主題,其中包含執行元件的核心裝置名稱。
{
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.MyIoTCorePubSubComponent
:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to all topics.",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"factory/1/devices/{iot:thingName}/controls"
]
}
}
}
}
PublishToIoTCore
在主題 AWS IoT Core 上將 MQTT 訊息發佈至 。
當您發佈 MQTT 訊息到 時 AWS IoT Core,每秒有 100 筆交易的配額。如果您超過此配額,訊息會排入佇列,以便在 Greengrass 裝置上處理。每秒也有 512 Kb 的資料配額,以及每秒 20,000 個發佈的全帳戶配額 (部分為 2,000 個 AWS 區域)。如需 中 MQTT 訊息代理程式限制的詳細資訊 AWS IoT Core,請參閱AWS IoT Core 訊息代理程式和通訊協定限制和配額。
如果您超過這些配額,Greengrass 裝置會將發佈訊息限制為 AWS IoT Core。訊息存放在記憶體中的多工緩衝處理程式中。根據預設,配置給多工緩衝處理器的記憶體為 2.5 Mb。如果多工緩衝處理器填滿,則會拒絕新訊息。您可以增加多工緩衝處理的大小。如需詳細資訊,請參閱 Greengrass 核 文件中的 組態。為了避免填入多工緩衝處理程式,並需要增加配置的記憶體,請將發佈請求限制為每秒不超過 100 個請求。
當您的應用程式需要以較高的速率傳送訊息或較大的訊息時,請考慮使用 串流管理員 傳送訊息至 Kinesis Data Streams。串流管理員元件旨在將大量資料傳輸至 AWS 雲端。如需詳細資訊,請參閱管理 Greengrass 核心裝置上的資料串流。
請求
此操作的請求具有下列參數:
topicName
(Python:topic_name
)-
要發佈訊息的主題。
qos
-
要使用的 MQTT QoS。此列舉
QOS
具有下列值:-
AT_MOST_ONCE
– QoS 0。MQTT 訊息最多傳遞一次。 -
AT_LEAST_ONCE
– QoS 1。MQTT 訊息至少傳遞一次。
-
payload
-
(選用) 訊息承載做為 Blob。
使用 MQTT 5 Greengrass 核時,下列功能適用於 2.10.0 版及更新版本。當您使用 MQTT 3.1.1 時,會忽略這些功能。下表列出存取這些功能時,您必須使用的 AWS IoT 最小裝置 SDK 版本。
SDK | 最低版本 |
---|---|
適用於 Python 的 AWS IoT Device SDK
v2 |
1.15.0 版 |
適用於 JAVA 的 AWS IoT Device SDK
v2 |
1.13.0 版 |
適用於 C++ 的 AWS IoT Device SDK
v2 |
1.24.0 版 |
適用於 JavaScript 的 AWS IoT Device SDK v2 |
1.13.0 版 |
payloadFormat
-
(選用) 訊息承載的格式。如果您未設定
payloadFormat
,則會假設類型為BYTES
。列舉具有下列值:-
BYTES
– 承載的內容是二進位 Blob。 -
UTF8
– 承載的內容是 UTF8 字元字串。
-
retain
-
(選用) 指出是否要在發佈
true
時將 MQTT 保留選項設定為 。 userProperties
-
(選用) 要傳送的應用程式特定
UserProperty
物件清單。UserProperty
物件定義如下:UserProperty: key: string value: string
messageExpiryIntervalSeconds
-
(選用) 訊息過期前的秒數,並由伺服器刪除。如果未設定此值,則訊息不會過期。
correlationData
-
(選用) 新增至請求的資訊,可用於將請求與回應建立關聯。
responseTopic
-
(選用) 應該用於回應訊息的主題。
contentType
-
(選用) 訊息內容類型的應用程式特定識別符。
回應
此操作不會在其回應中提供任何資訊。
範例
下列範例示範如何在自訂元件程式碼中呼叫此操作。
範例:發佈訊息
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest;
import software.amazon.awssdk.aws.greengrass.model.QOS;
import java.nio.charset.StandardCharsets;
public class PublishToIoTCore {
public static void main(String[] args) {
String topic = args[0];
String message = args[1];
QOS qos = QOS.get(args[2]);
try (GreengrassCoreIPCClientV2 ipcClientV2 = GreengrassCoreIPCClientV2.builder().build()) {
ipcClientV2.publishToIoTCore(new PublishToIoTCoreRequest()
.withTopicName(topic)
.withPayload(message.getBytes(StandardCharsets.UTF_8))
.withQos(qos));
System.out.println("Successfully published to topic: " + topic);
} catch (Exception e) {
System.err.println("Exception occurred.");
e.printStackTrace();
System.exit(1);
}
}
}
SubscribeToIoTCore
在主題或主題篩選條件 AWS IoT Core 上,從 訂閱 MQTT 訊息。 AWS IoT Greengrass 核心軟體會在元件達到其生命週期結束時移除訂閱。
此操作是一種訂閱操作,您可以在其中訂閱事件訊息串流。若要使用此操作,請定義串流回應處理常式,其中包含處理事件訊息、錯誤和串流關閉的函數。如需詳細資訊,請參閱訂閱 IPC 事件串流。
事件訊息類型: IoTCoreMessage
請求
此操作的請求具有下列參數:
topicName
(Python:topic_name
)-
要訂閱的主題。您可以使用 MQTT 主題萬用字元 (
#
和+
) 來訂閱多個主題。 qos
-
要使用的 MQTT QoS。此列舉
QOS
具有下列值:-
AT_MOST_ONCE
– QoS 0。MQTT 訊息最多傳遞一次。 -
AT_LEAST_ONCE
– QoS 1。MQTT 訊息至少傳遞一次。
-
回應
此操作的回應具有下列資訊:
messages
-
MQTT 訊息的串流。此物件
IoTCoreMessage
包含下列資訊:message
-
MQTT 訊息。此物件
MQTTMessage
包含下列資訊:topicName
(Python:topic_name
)-
訊息發佈的 主題。
payload
-
(選用) 訊息承載做為 Blob。
使用 MQTT 5 Greengrass 核時,下列功能適用於 2.10.0 版及更新版本。當您使用 MQTT 3.1.1 時,會忽略這些功能。下表列出存取這些功能時,您必須使用的 AWS IoT 最小裝置 SDK 版本。
SDK 最低版本 適用於 Python 的 AWS IoT Device SDK v2 1.15.0 版 適用於 JAVA 的 AWS IoT Device SDK v2 1.13.0 版 適用於 C++ 的 AWS IoT Device SDK v2 1.24.0 版 適用於 JavaScript 的 AWS IoT Device SDK v2 1.13.0 版 payloadFormat
-
(選用) 訊息承載的格式。如果您未設定
payloadFormat
,則會假設類型為BYTES
。列舉具有下列值:-
BYTES
– 承載的內容是二進位 Blob。 -
UTF8
– 承載的內容是 UTF8 字元字串。
-
retain
-
(選用) 指出是否要在發佈
true
時將 MQTT 保留選項設定為 。 userProperties
-
(選用) 要傳送的應用程式特定
UserProperty
物件清單。UserProperty
物件定義如下:UserProperty: key: string value: string
messageExpiryIntervalSeconds
-
(選用) 訊息過期前的秒數,並由伺服器刪除。如果未設定此值,則訊息不會過期。
correlationData
-
(選用) 新增至請求的資訊,可用於將請求與回應建立關聯。
responseTopic
-
(選用) 應該用於回應訊息的主題。
contentType
-
(選用) 訊息內容類型的應用程式特定識別符。
範例
下列範例示範如何在自訂元件程式碼中呼叫此操作。
範例:訂閱訊息
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.SubscribeToIoTCoreResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.QOS;
import software.amazon.awssdk.aws.greengrass.model.IoTCoreMessage;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToIoTCoreRequest;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToIoTCoreResponse;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
public class SubscribeToIoTCore {
public static void main(String[] args) {
String topic = args[0];
QOS qos = QOS.get(args[1]);
Consumer<IoTCoreMessage> onStreamEvent = ioTCoreMessage ->
System.out.printf("Received new message on topic %s: %s%n",
ioTCoreMessage.getMessage().getTopicName(),
new String(ioTCoreMessage.getMessage().getPayload(), StandardCharsets.UTF_8));
Optional<Function<Throwable, Boolean>> onStreamError =
Optional.of(e -> {
System.err.println("Received a stream error.");
e.printStackTrace();
return false;
});
Optional<Runnable> onStreamClosed = Optional.of(() ->
System.out.println("Subscribe to IoT Core stream closed."));
try (GreengrassCoreIPCClientV2 ipcClientV2 = GreengrassCoreIPCClientV2.builder().build()) {
SubscribeToIoTCoreRequest request = new SubscribeToIoTCoreRequest()
.withTopicName(topic)
.withQos(qos);
GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToIoTCoreResponse, SubscribeToIoTCoreResponseHandler>
streamingResponse = ipcClientV2.subscribeToIoTCore(request, onStreamEvent, onStreamError, onStreamClosed);
streamingResponse.getResponse();
System.out.println("Successfully subscribed to topic: " + topic);
// Keep the main thread alive, or the process will exit.
while (true) {
Thread.sleep(10000);
}
// To stop subscribing, close the stream.
streamingResponse.getHandler().closeStream();
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
} catch (Exception e) {
System.err.println("Exception occurred.");
e.printStackTrace();
System.exit(1);
}
}
}
範例
使用下列範例來了解如何在元件中使用 AWS IoT Core MQTT IPC 服務。
下列範例配方允許 元件發佈至所有主題。
下列 C++ 應用程式範例示範如何使用 AWS IoT Core MQTT IPC 服務來發佈訊息 AWS IoT Core。
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String message("Hello from the Greengrass IPC MQTT publisher (C++).");
String topic("test/topic/cpp");
QOS qos = QOS_AT_LEAST_ONCE;
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
while (true) {
PublishToIoTCoreRequest request;
Vector<uint8_t> messageData({message.begin(), message.end()});
request.SetTopicName(topic);
request.SetPayload(messageData);
request.SetQos(qos);
auto operation = ipcClient.NewPublishToIoTCore();
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully published to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to publish to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
std::this_thread::sleep_for(std::chrono::seconds(5));
}
return 0;
}
下列範例配方允許 元件訂閱所有主題。
下列範例 C++ 應用程式示範如何使用 AWS IoT Core MQTT IPC 服務來訂閱訊息 AWS IoT Core。
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler {
public:
virtual ~IoTCoreResponseHandler() {}
private:
void OnStreamEvent(IoTCoreMessage *response) override {
auto message = response->GetMessage();
if (message.has_value() && message.value().GetPayload().has_value()) {
auto messageBytes = message.value().GetPayload().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
std::string messageTopic = message.value().GetTopicName().value().c_str();
std::cout << "Received new message on topic: " << messageTopic << std::endl;
std::cout << "Message: " << messageString << std::endl;
}
}
bool OnStreamError(OperationError *error) override {
std::cout << "Received an operation error: ";
if (error->GetMessage().has_value()) {
std::cout << error->GetMessage().value();
}
std::cout << std::endl;
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
std::cout << "Subscribe to IoT Core stream closed." << std::endl;
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String topic("test/topic/cpp");
QOS qos = QOS_AT_LEAST_ONCE;
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
SubscribeToIoTCoreRequest request;
request.SetTopicName(topic);
request.SetQos(qos);
auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully subscribed to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to subscribe to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}