本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
發佈/訂閱訊 AWS IoT Core MQTT息
AWS IoT Core MQTT訊息傳送IPC服務可讓您傳送和接收訊MQTT息 AWS IoT Core。元件可以將郵件發佈至 AWS IoT Core 並訂閱主題,以對來自其他來源的MQTT郵件採取行動。如需有關 AWS IoT Core 實作的詳細資訊MQTT,請參閱AWS IoT Core 開發人員指南MQTT中的。
此消MQTT息傳遞IPC服務可讓您與之交換消息 AWS IoT Core。如需如何在元件之間交換郵件的相關資訊,請參閱發佈/訂閱本地訊息。
最低SDK版本
下表列出您必須使用 AWS IoT Device SDK 的最低版本來發行和訂閱MQTT郵件 AWS IoT Core。
授權
若要在自訂元件中使用 AWS IoT Core MQTT訊息,您必須定義授權原則,以允許元件傳送和接收有關主題的訊息。如需有關定義授權原則的資訊,請參閱授權元件執行IPC作業。
AWS IoT Core MQTT郵件的授權原則具有下列屬性。
IPC服務標識符:aws.greengrass.ipc.mqttproxy
作業 |
描述 |
資源 |
aws.greengrass#PublishToIoTCore
|
允許元件根據您指定 AWS IoT Core 的MQTT主題將訊息發佈至。
|
主題字串,例如test/topic ,或* 允許存取所有主題。您可以使用MQTT主題萬用字元 (# 和+ ) 來比對多個資源。
|
aws.greengrass#SubscribeToIoTCore
|
允許元件訂閱您指 AWS IoT Core 定主題的訊息。
|
主題字串,例如test/topic ,或* 允許存取所有主題。您可以使用MQTT主題萬用字元 (# 和+ ) 來比對多個資源。
|
*
|
允許元件針對您指定的主題發佈和訂閱 AWS IoT Core MQTT訊息。
|
主題字串,例如test/topic ,或* 允許存取所有主題。您可以使用MQTT主題萬用字元 (# 和+ ) 來比對多個資源。
|
MQTT AWS IoT Core
MQTT授權原則中的萬用字元
您可以在 AWS IoT Core MQTTIPC授權原則中使用MQTT萬用字元。元件可以發佈和訂閱符合授權原則中允許的主題篩選器的主題。例如,如果元件的授權原則授與存取權test/topic/#
,則該元件可以訂閱test/topic/#
,而且可以發佈和訂閱test/topic/filter
。
AWS IoT Core MQTT授權政策中的配方變數
如果您使用 v2.6.0 或更新版本的 Greengrass 核心,您可以在授權政策中{iot:thingName}
使用 recipe 變數。此功能可讓您為一組核心裝置設定單一授權原則,其中每個核心裝置只能存取包含自己名稱的主題。例如,您可以允許元件存取下列主題資源。
devices/{iot:thingName}/messages
如需詳細資訊,請參閱 配方變數 和 在合併更新中使用配方變數。
授權政策範例
您可以參考下列授權原則範例,協助您設定元件的授權原則。
範例 無限制存取權限的授權原則範例
下列範例授權原則可讓元件發佈和訂閱所有主題。
- JSON
-
{
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.MyIoTCorePubSubComponent
:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to all topics.",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"*"
]
}
}
}
}
- YAML
-
---
accessControl:
aws.greengrass.ipc.mqttproxy:
com.example.MyIoTCorePubSubComponent:mqttproxy:1:
policyDescription: Allows access to publish/subscribe to all topics.
operations:
- aws.greengrass#PublishToIoTCore
- aws.greengrass#SubscribeToIoTCore
resources:
- "*"
範例 限制存取權限的授權原則範例
下列範例授權原則可讓元件發行和訂閱兩個名為factory/1/events
和的主題factory/1/actions
。
- JSON
-
{
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.MyIoTCorePubSubComponent
:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to factory 1 topics.",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"factory/1/actions",
"factory/1/events"
]
}
}
}
}
- YAML
-
---
accessControl:
aws.greengrass.ipc.mqttproxy:
"com.example.MyIoTCorePubSubComponent
:mqttproxy:1":
policyDescription: Allows access to publish/subscribe to factory 1 topics.
operations:
- aws.greengrass#PublishToIoTCore
- aws.greengrass#SubscribeToIoTCore
resources:
- factory/1/actions
- factory/1/events
範例 一組核心裝置的授權原則範例
下列範例授權原則可讓元件發佈並訂閱包含執行元件之核心裝置名稱的主題。
- JSON
-
{
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.MyIoTCorePubSubComponent
:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to all topics.",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"factory/1/devices/{iot:thingName}/controls"
]
}
}
}
}
- YAML
-
---
accessControl:
aws.greengrass.ipc.mqttproxy:
"com.example.MyIoTCorePubSubComponent
:mqttproxy:1":
policyDescription: Allows access to publish/subscribe to all topics.
operations:
- aws.greengrass#PublishToIoTCore
- aws.greengrass#SubscribeToIoTCore
resources:
- factory/1/devices/{iot:thingName}/controls
PublishToIoTCore
針 AWS IoT Core 對主題將MQTT訊息發佈至。
當您將MQTT郵件發佈到時 AWS IoT Core,每秒會有 100 筆交易的配額。如果您超出此配額,訊息就會排入佇列,以便在 Greengrass 裝置上進行處理。此外,每秒還有 512 Kb 的資料配額,以及每秒發佈 20,000 個帳戶的配額 (部分 AWS 區域為 2,000 個)。如需中訊MQTT息代理程式限制的相關資訊 AWS IoT Core,請參閱訊AWS IoT Core 息代理程式和通訊協定限制與配額。
如果您超過這些配額,Greengrass 裝置會限制將訊息發佈到。 AWS IoT Core消息存儲在內存中的後台處理程序中。根據預設,分配給多工緩衝處理程式的記憶體為 2.5 Mb。如果多工緩衝處理程式填滿,則會拒絕新郵件。您可以增加多工緩衝處理程式的大小。如需詳細資訊,請參閱 Greengrass 核 文件中的 組態。為了避免填充多工緩衝處理程序並需要增加分配的內存,請將發布請求限制為每秒不超過 100 個請求。
當您的應用程式需要以較高的速率或較大的訊息傳送訊息時,請考慮使用將訊息傳送串流管理員至 Kinesis Data Streams。串流管理員元件旨在將大容量資料傳輸到 AWS 雲端. 如需詳細資訊,請參閱管理核心裝 Greengrass 資料串流。
請求
此操作的請求具有以下參數:
topicName
(Python:topic_name
)
-
要發佈郵件的目標主題。
qos
-
要使用的 MQTT QoS。此枚舉具有以下值:QOS
payload
-
(選擇性) 訊息承載為 blob。
使用 5 Greengrass 核 時,v2.10.0 及更新版本可以使用下列功能。MQTT當您使用 MQTT 3.1.1 時,會忽略這些功能。下表列出存取這些功能所必須SDK使用的 AWS IoT 裝置最低版本。
payloadFormat
-
(選擇性) 訊息承載的格式。如果您未設定payloadFormat
,則會假設類型為BYTES
。枚舉具有以下值:
-
BYTES
— 有效負載的內容是二進位 blob。
-
UTF8
— 有效負載的內容是字元字UTF8串。
retain
-
(選擇性) 指出發佈true
時是否將MQTT保留選項設定為。
userProperties
-
(選擇性) 要傳送的應用程式特定UserProperty
物件清單。物UserProperty
件定義如下:
UserProperty:
key: string
value: string
messageExpiryIntervalSeconds
-
(選擇性) 郵件到期前的秒數,並由伺服器刪除。如果未設定此值,則訊息不會過期。
correlationData
-
(選擇性) 新增至要求的資訊,可用來建立要求與回應的關聯。
responseTopic
-
(選擇性) 應該用於回應訊息的主題。
contentType
-
(選擇性) 訊息內容類型的應用程式特定識別碼。
回應
此操作在其響應中不提供任何信息。
範例
下列範例示範如何在自訂元件程式碼中呼叫此作業。
- Java (IPC client V2)
-
範例:發佈訊息
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest;
import software.amazon.awssdk.aws.greengrass.model.QOS;
import java.nio.charset.StandardCharsets;
public class PublishToIoTCore {
public static void main(String[] args) {
String topic = args[0];
String message = args[1];
QOS qos = QOS.get(args[2]);
try (GreengrassCoreIPCClientV2 ipcClientV2 = GreengrassCoreIPCClientV2.builder().build()) {
ipcClientV2.publishToIoTCore(new PublishToIoTCoreRequest()
.withTopicName(topic)
.withPayload(message.getBytes(StandardCharsets.UTF_8))
.withQos(qos));
System.out.println("Successfully published to topic: " + topic);
} catch (Exception e) {
System.err.println("Exception occurred.");
e.printStackTrace();
System.exit(1);
}
}
}
- Python (IPC client V2)
-
範例:發佈訊息
這個範例假設您使用的是適用 AWS IoT Device SDK 於 Python v2 的 1.5.4 版或更新版本。
import awsiot.greengrasscoreipc.clientv2 as clientV2
topic = 'my/topic'
qos = '1'
payload = 'Hello, World'
ipc_client = clientV2.GreengrassCoreIPCClientV2()
resp = ipc_client.publish_to_iot_core(topic_name=topic, qos=qos, payload=payload)
ipc_client.close()
- Java (IPC client V1)
-
範例:發佈訊息
package com.aws.greengrass.docs.samples.ipc;
import com.aws.greengrass.docs.samples.ipc.util.IPCUtils;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
import software.amazon.awssdk.aws.greengrass.PublishToIoTCoreResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest;
import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreResponse;
import software.amazon.awssdk.aws.greengrass.model.QOS;
import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class PublishToIoTCore {
public static final int TIMEOUT_SECONDS = 10;
public static void main(String[] args) {
String topic = args[0];
String message = args[1];
QOS qos = QOS.get(args[2]);
try (EventStreamRPCConnection eventStreamRPCConnection =
IPCUtils.getEventStreamRpcConnection()) {
GreengrassCoreIPCClient ipcClient =
new GreengrassCoreIPCClient(eventStreamRPCConnection);
PublishToIoTCoreResponseHandler responseHandler =
PublishToIoTCore.publishBinaryMessageToTopic(ipcClient, topic, message, qos);
CompletableFuture<PublishToIoTCoreResponse> futureResponse =
responseHandler.getResponse();
try {
futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
System.out.println("Successfully published to topic: " + topic);
} catch (TimeoutException e) {
System.err.println("Timeout occurred while publishing to topic: " + topic);
} catch (ExecutionException e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
throw e;
}
}
} catch (InterruptedException e) {
System.out.println("IPC interrupted.");
} catch (ExecutionException e) {
System.err.println("Exception occurred when using IPC.");
e.printStackTrace();
System.exit(1);
}
}
public static PublishToIoTCoreResponseHandler publishBinaryMessageToTopic(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, String message, QOS qos) {
PublishToIoTCoreRequest publishToIoTCoreRequest = new PublishToIoTCoreRequest();
publishToIoTCoreRequest.setTopicName(topic);
publishToIoTCoreRequest.setPayload(message.getBytes(StandardCharsets.UTF_8));
publishToIoTCoreRequest.setQos(qos);
return greengrassCoreIPCClient.publishToIoTCore(publishToIoTCoreRequest, Optional.empty());
}
}
- Python (IPC client V1)
-
範例:發佈訊息
這個範例假設您使用的是適用 AWS IoT Device SDK 於 Python v2 的 1.5.4 版或更新版本。
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
QOS,
PublishToIoTCoreRequest
)
TIMEOUT = 10
ipc_client = awsiot.greengrasscoreipc.connect()
topic = "my/topic"
message = "Hello, World"
qos = QOS.AT_LEAST_ONCE
request = PublishToIoTCoreRequest()
request.topic_name = topic
request.payload = bytes(message, "utf-8")
request.qos = qos
operation = ipc_client.new_publish_to_iot_core()
operation.activate(request)
future_response = operation.get_response()
future_response.result(TIMEOUT)
- C++
-
範例:發佈訊息
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
// Handle connection to IPC service.
}
void OnDisconnectCallback(RpcError error) override {
// Handle disconnection from IPC service.
}
bool OnErrorCallback(RpcError error) override {
// Handle IPC service connection error.
return true;
}
};
int main() {
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
String message("Hello, World!");
String topic("my/topic");
QOS qos = QOS_AT_MOST_ONCE;
int timeout = 10;
PublishToIoTCoreRequest request;
Vector<uint8_t> messageData({message.begin(), message.end()});
request.SetTopicName(topic);
request.SetPayload(messageData);
request.SetQos(qos);
auto operation = ipcClient.NewPublishToIoTCore();
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (!response) {
// Handle error.
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
(void)error;
// Handle operation error.
} else {
// Handle RPC error.
}
}
return 0;
}
- JavaScript
-
範例:發佈訊息
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc";
import {QOS, PublishToIoTCoreRequest} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model";
class PublishToIoTCore {
private ipcClient: greengrasscoreipc.Client
private readonly topic: string;
constructor() {
// define your own constructor, e.g.
this.topic = "<define_your_topic>";
this.publishToIoTCore().then(r => console.log("Started workflow"));
}
private async publishToIoTCore() {
try {
const request: PublishToIoTCoreRequest = {
topicName: this.topic,
qos: QOS.AT_LEAST_ONCE, // you can change this depending on your use case
}
this.ipcClient = await getIpcClient();
await this.ipcClient.publishToIoTCore(request);
} catch (e) {
// parse the error depending on your use cases
throw e
}
}
}
export async function getIpcClient(){
try {
const ipcClient = greengrasscoreipc.createClient();
await ipcClient.connect()
.catch(error => {
// parse the error depending on your use cases
throw error;
});
return ipcClient
} catch (err) {
// parse the error depending on your use cases
throw err
}
}
// starting point
const publishToIoTCore = new PublishToIoTCore();
SubscribeToIoTCore
訂閱來自 AWS IoT Core 主題或主題篩選器的MQTT郵件。 AWS IoT Greengrass 核心軟體會在元件到達其生命週期結束時移除訂閱。
此作業是訂閱作業,您可以在其中訂閱事件訊息串流。若要使用此作業,請定義具有處理事件訊息、錯誤和資料流結束之函數的串流回應處理常式。如需詳細資訊,請參閱訂閱IPC事件串流。
事件訊息類型:IoTCoreMessage
請求
此操作的請求具有以下參數:
topicName
(Python:topic_name
)
-
要訂閱的主題。您可以使用MQTT主題萬用字元 (#
和+
) 來訂閱多個主題。
qos
-
要使用的 MQTT QoS。此枚舉具有以下值:QOS
回應
此作業的回應包含下列資訊:
messages
-
MQTT訊息串流。此物件IoTCoreMessage
包含下列資訊:
message
-
該MQTT消息。此物件MQTTMessage
包含下列資訊:
topicName
(Python:topic_name
)
-
郵件發行目標的主題。
payload
-
(選擇性) 訊息承載為 blob。
使用 5 Greengrass 核 時,v2.10.0 及更新版本可以使用下列功能。MQTT當您使用 MQTT 3.1.1 時,會忽略這些功能。下表列出存取這些功能所必須SDK使用的 AWS IoT 裝置最低版本。
payloadFormat
-
(選擇性) 訊息承載的格式。如果您未設定payloadFormat
,則會假設類型為BYTES
。枚舉具有以下值:
-
BYTES
— 有效負載的內容是二進位 blob。
-
UTF8
— 有效負載的內容是字元字UTF8串。
retain
-
(選擇性) 指出發佈true
時是否將MQTT保留選項設定為。
userProperties
-
(選擇性) 要傳送的應用程式特定UserProperty
物件清單。物UserProperty
件定義如下:
UserProperty:
key: string
value: string
messageExpiryIntervalSeconds
-
(選擇性) 郵件到期前的秒數,並由伺服器刪除。如果未設定此值,則訊息不會過期。
correlationData
-
(選擇性) 新增至要求的資訊,可用來建立要求與回應的關聯。
responseTopic
-
(選擇性) 應該用於回應訊息的主題。
contentType
-
(選擇性) 訊息內容類型的應用程式特定識別碼。
範例
下列範例示範如何在自訂元件程式碼中呼叫此作業。
- Java (IPC client V2)
-
範例:訂閱訊息
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.SubscribeToIoTCoreResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.QOS;
import software.amazon.awssdk.aws.greengrass.model.IoTCoreMessage;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToIoTCoreRequest;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToIoTCoreResponse;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
public class SubscribeToIoTCore {
public static void main(String[] args) {
String topic = args[0];
QOS qos = QOS.get(args[1]);
Consumer<IoTCoreMessage> onStreamEvent = ioTCoreMessage ->
System.out.printf("Received new message on topic %s: %s%n",
ioTCoreMessage.getMessage().getTopicName(),
new String(ioTCoreMessage.getMessage().getPayload(), StandardCharsets.UTF_8));
Optional<Function<Throwable, Boolean>> onStreamError =
Optional.of(e -> {
System.err.println("Received a stream error.");
e.printStackTrace();
return false;
});
Optional<Runnable> onStreamClosed = Optional.of(() ->
System.out.println("Subscribe to IoT Core stream closed."));
try (GreengrassCoreIPCClientV2 ipcClientV2 = GreengrassCoreIPCClientV2.builder().build()) {
SubscribeToIoTCoreRequest request = new SubscribeToIoTCoreRequest()
.withTopicName(topic)
.withQos(qos);
GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToIoTCoreResponse, SubscribeToIoTCoreResponseHandler>
streamingResponse = ipcClientV2.subscribeToIoTCore(request, onStreamEvent, onStreamError, onStreamClosed);
streamingResponse.getResponse();
System.out.println("Successfully subscribed to topic: " + topic);
// Keep the main thread alive, or the process will exit.
while (true) {
Thread.sleep(10000);
}
// To stop subscribing, close the stream.
streamingResponse.getHandler().closeStream();
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
} catch (Exception e) {
System.err.println("Exception occurred.");
e.printStackTrace();
System.exit(1);
}
}
}
- Python (IPC client V2)
-
範例:訂閱訊息
這個範例假設您使用的是適用 AWS IoT Device SDK 於 Python v2 的 1.5.4 版或更新版本。
import threading
import traceback
import awsiot.greengrasscoreipc.clientv2 as clientV2
topic = 'my/topic'
qos = '1'
def on_stream_event(event):
try:
topic_name = event.message.topic_name
message = str(event.message.payload, 'utf-8')
print(f'Received new message on topic {topic_name}: {message}')
except:
traceback.print_exc()
def on_stream_error(error):
# Return True to close stream, False to keep stream open.
return True
def on_stream_closed():
pass
ipc_client = clientV2.GreengrassCoreIPCClientV2()
resp, operation = ipc_client.subscribe_to_iot_core(
topic_name=topic,
qos=qos,
on_stream_event=on_stream_event,
on_stream_error=on_stream_error,
on_stream_closed=on_stream_closed
)
# Keep the main thread alive, or the process will exit.
event = threading.Event()
event.wait()
# To stop subscribing, close the operation stream.
operation.close()
ipc_client.close()
- Java (IPC client V1)
-
範例:訂閱訊息
package com.aws.greengrass.docs.samples.ipc;
import com.aws.greengrass.docs.samples.ipc.util.IPCUtils;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
import software.amazon.awssdk.aws.greengrass.SubscribeToIoTCoreResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.*;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SubscribeToIoTCore {
public static final int TIMEOUT_SECONDS = 10;
public static void main(String[] args) {
String topic = args[0];
QOS qos = QOS.get(args[1]);
try (EventStreamRPCConnection eventStreamRPCConnection =
IPCUtils.getEventStreamRpcConnection()) {
GreengrassCoreIPCClient ipcClient =
new GreengrassCoreIPCClient(eventStreamRPCConnection);
StreamResponseHandler<IoTCoreMessage> streamResponseHandler =
new SubscriptionResponseHandler();
SubscribeToIoTCoreResponseHandler responseHandler =
SubscribeToIoTCore.subscribeToIoTCore(ipcClient, topic, qos,
streamResponseHandler);
CompletableFuture<SubscribeToIoTCoreResponse> futureResponse =
responseHandler.getResponse();
try {
futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
System.out.println("Successfully subscribed to topic: " + topic);
} catch (TimeoutException e) {
System.err.println("Timeout occurred while subscribing to topic: " + topic);
} catch (ExecutionException e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while subscribing to topic: " + topic);
} else {
throw e;
}
}
// Keep the main thread alive, or the process will exit.
try {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
}
// To stop subscribing, close the stream.
responseHandler.closeStream();
} catch (InterruptedException e) {
System.out.println("IPC interrupted.");
} catch (ExecutionException e) {
System.err.println("Exception occurred when using IPC.");
e.printStackTrace();
System.exit(1);
}
}
public static SubscribeToIoTCoreResponseHandler subscribeToIoTCore(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, QOS qos, StreamResponseHandler<IoTCoreMessage> streamResponseHandler) {
SubscribeToIoTCoreRequest subscribeToIoTCoreRequest = new SubscribeToIoTCoreRequest();
subscribeToIoTCoreRequest.setTopicName(topic);
subscribeToIoTCoreRequest.setQos(qos);
return greengrassCoreIPCClient.subscribeToIoTCore(subscribeToIoTCoreRequest,
Optional.of(streamResponseHandler));
}
public static class SubscriptionResponseHandler implements StreamResponseHandler<IoTCoreMessage> {
@Override
public void onStreamEvent(IoTCoreMessage ioTCoreMessage) {
try {
String topic = ioTCoreMessage.getMessage().getTopicName();
String message = new String(ioTCoreMessage.getMessage().getPayload(),
StandardCharsets.UTF_8);
System.out.printf("Received new message on topic %s: %s%n", topic, message);
} catch (Exception e) {
System.err.println("Exception occurred while processing subscription response " +
"message.");
e.printStackTrace();
}
}
@Override
public boolean onStreamError(Throwable error) {
System.err.println("Received a stream error.");
error.printStackTrace();
return false;
}
@Override
public void onStreamClosed() {
System.out.println("Subscribe to IoT Core stream closed.");
}
}
}
- Python (IPC client V1)
-
範例:訂閱訊息
這個範例假設您使用的是適用 AWS IoT Device SDK 於 Python v2 的 1.5.4 版或更新版本。
import time
import traceback
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
IoTCoreMessage,
QOS,
SubscribeToIoTCoreRequest
)
TIMEOUT = 10
ipc_client = awsiot.greengrasscoreipc.connect()
class StreamHandler(client.SubscribeToIoTCoreStreamHandler):
def __init__(self):
super().__init__()
def on_stream_event(self, event: IoTCoreMessage) -> None:
try:
message = str(event.message.payload, "utf-8")
topic_name = event.message.topic_name
# Handle message.
except:
traceback.print_exc()
def on_stream_error(self, error: Exception) -> bool:
# Handle error.
return True # Return True to close stream, False to keep stream open.
def on_stream_closed(self) -> None:
# Handle close.
pass
topic = "my/topic"
qos = QOS.AT_MOST_ONCE
request = SubscribeToIoTCoreRequest()
request.topic_name = topic
request.qos = qos
handler = StreamHandler()
operation = ipc_client.new_subscribe_to_iot_core(handler)
operation.activate(request)
future_response = operation.get_response()
future_response.result(TIMEOUT)
# Keep the main thread alive, or the process will exit.
while True:
time.sleep(10)
# To stop subscribing, close the operation stream.
operation.close()
- C++
-
範例:訂閱訊息
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler {
public:
virtual ~IoTCoreResponseHandler() {}
private:
void OnStreamEvent(IoTCoreMessage *response) override {
auto message = response->GetMessage();
if (message.has_value() && message.value().GetPayload().has_value()) {
auto messageBytes = message.value().GetPayload().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
std::string topicName = message.value().GetTopicName().value().c_str();
// Handle message.
}
}
bool OnStreamError(OperationError *error) override {
// Handle error.
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
// Handle close.
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
// Handle connection to IPC service.
}
void OnDisconnectCallback(RpcError error) override {
// Handle disconnection from IPC service.
}
bool OnErrorCallback(RpcError error) override {
// Handle IPC service connection error.
return true;
}
};
int main() {
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
String topic("my/topic");
QOS qos = QOS_AT_MOST_ONCE;
int timeout = 10;
SubscribeToIoTCoreRequest request;
request.SetTopicName(topic);
request.SetQos(qos);
auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (!response) {
// Handle error.
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
(void)error;
// Handle operation error.
} else {
// Handle RPC error.
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}
- JavaScript
-
範例:訂閱訊息
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc";
import {IoTCoreMessage, QOS, SubscribeToIoTCoreRequest} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model";
import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc";
class SubscribeToIoTCore {
private ipcClient: greengrasscoreipc.Client
private readonly topic: string;
constructor() {
// define your own constructor, e.g.
this.topic = "<define_your_topic>";
this.subscribeToIoTCore().then(r => console.log("Started workflow"));
}
private async subscribeToIoTCore() {
try {
const request: SubscribeToIoTCoreRequest = {
topicName: this.topic,
qos: QOS.AT_LEAST_ONCE, // you can change this depending on your use case
}
this.ipcClient = await getIpcClient();
const streamingOperation = this.ipcClient.subscribeToIoTCore(request);
streamingOperation.on('message', (message: IoTCoreMessage) => {
// parse the message depending on your use cases, e.g.
if (message.message && message.message.payload) {
const receivedMessage = message.message.payload.toString();
}
});
streamingOperation.on('streamError', (error : RpcError) => {
// define your own error handling logic
});
streamingOperation.on('ended', () => {
// define your own logic
});
await streamingOperation.activate();
// Keep the main thread alive, or the process will exit.
await new Promise((resolve) => setTimeout(resolve, 10000))
} catch (e) {
// parse the error depending on your use cases
throw e
}
}
}
export async function getIpcClient(){
try {
const ipcClient = greengrasscoreipc.createClient();
await ipcClient.connect()
.catch(error => {
// parse the error depending on your use cases
throw error;
});
return ipcClient
} catch (err) {
// parse the error depending on your use cases
throw err
}
}
// starting point
const subscribeToIoTCore = new SubscribeToIoTCore();
範例
使用下列範例來瞭解如何在元件中使用 AWS IoT Core MQTTIPC服務。
下列範例方案可讓元件發佈至所有主題。
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.IoTCorePublisherCpp",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that publishes MQTT messages to IoT Core.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.IoTCorePublisherCpp:mqttproxy:1": {
"policyDescription": "Allows access to publish to all topics.",
"operations": [
"aws.greengrass#PublishToIoTCore"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"run": "{artifacts:path}/greengrassv2_iotcore_publisher"
},
"Artifacts": [
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCorePublisherCpp/1.0.0/greengrassv2_iotcore_publisher",
"Permission": {
"Execute": "OWNER"
}
}
]
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.IoTCorePublisherCpp
ComponentVersion: 1.0.0
ComponentDescription: A component that publishes MQTT messages to IoT Core.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.mqttproxy:
com.example.IoTCorePublisherCpp:mqttproxy:1:
policyDescription: Allows access to publish to all topics.
operations:
- aws.greengrass#PublishToIoTCore
resources:
- "*"
Manifests:
- Lifecycle:
run: "{artifacts:path}/greengrassv2_iotcore_publisher"
Artifacts:
- URI: s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCorePublisherCpp/1.0.0/greengrassv2_iotcore_publisher
Permission:
Execute: OWNER
以下實例 C ++ 應用程序演示瞭如何使用該 AWS IoT Core MQTTIPC服務將消息發布到 AWS IoT Core。
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String message("Hello from the Greengrass IPC MQTT publisher (C++).");
String topic("test/topic/cpp");
QOS qos = QOS_AT_LEAST_ONCE;
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
while (true) {
PublishToIoTCoreRequest request;
Vector<uint8_t> messageData({message.begin(), message.end()});
request.SetTopicName(topic);
request.SetPayload(messageData);
request.SetQos(qos);
auto operation = ipcClient.NewPublishToIoTCore();
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully published to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to publish to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
std::this_thread::sleep_for(std::chrono::seconds(5));
}
return 0;
}
下列範例方案可讓元件訂閱所有主題。
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.IoTCoreSubscriberCpp",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that subscribes to MQTT messages from IoT Core.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.IoTCoreSubscriberCpp:mqttproxy:1": {
"policyDescription": "Allows access to subscribe to all topics.",
"operations": [
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"run": "{artifacts:path}/greengrassv2_iotcore_subscriber"
},
"Artifacts": [
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/greengrassv2_iotcore_subscriber",
"Permission": {
"Execute": "OWNER"
}
}
]
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.IoTCoreSubscriberCpp
ComponentVersion: 1.0.0
ComponentDescription: A component that subscribes to MQTT messages from IoT Core.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.mqttproxy:
com.example.IoTCoreSubscriberCpp:mqttproxy:1:
policyDescription: Allows access to subscribe to all topics.
operations:
- aws.greengrass#SubscribeToIoTCore
resources:
- "*"
Manifests:
- Lifecycle:
run: "{artifacts:path}/greengrassv2_iotcore_subscriber"
Artifacts:
- URI: s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/greengrassv2_iotcore_subscriber
Permission:
Execute: OWNER
以下示例 C ++ 應用程序演示瞭如何使用該 AWS IoT Core MQTTIPC服務來訂閱來自的消息 AWS IoT Core。
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler {
public:
virtual ~IoTCoreResponseHandler() {}
private:
void OnStreamEvent(IoTCoreMessage *response) override {
auto message = response->GetMessage();
if (message.has_value() && message.value().GetPayload().has_value()) {
auto messageBytes = message.value().GetPayload().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
std::string messageTopic = message.value().GetTopicName().value().c_str();
std::cout << "Received new message on topic: " << messageTopic << std::endl;
std::cout << "Message: " << messageString << std::endl;
}
}
bool OnStreamError(OperationError *error) override {
std::cout << "Received an operation error: ";
if (error->GetMessage().has_value()) {
std::cout << error->GetMessage().value();
}
std::cout << std::endl;
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
std::cout << "Subscribe to IoT Core stream closed." << std::endl;
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String topic("test/topic/cpp");
QOS qos = QOS_AT_LEAST_ONCE;
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
SubscribeToIoTCoreRequest request;
request.SetTopicName(topic);
request.SetQos(qos);
auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully subscribed to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to subscribe to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}