Terbitkan/berlangganan pesan MQTT AWS IoT Core - AWS IoT Greengrass

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Terbitkan/berlangganan pesan MQTT AWS IoT Core

Layanan AWS IoT Core MQTT messaging IPC memungkinkan Anda mengirim dan menerima pesan MQTT ke dan dari. AWS IoT Core Komponen dapat mempublikasikan pesan ke AWS IoT Core dan berlangganan topik untuk bertindak atas pesan MQTT dari sumber lain. Untuk informasi selengkapnya tentang AWS IoT Core implementasi MQTT, lihat MQTT di Panduan Pengembang.AWS IoT Core

catatan

Layanan MQTT messaging IPC ini memungkinkan Anda bertukar pesan dengan. AWS IoT Core Untuk informasi selengkapnya tentang cara bertukar pesan antar komponen, lihat Pesan lokal publikasi/berlangganan.

SDK (Versi Minimum)

Tabel berikut mencantumkan versi minimum AWS IoT Device SDK yang harus Anda gunakan untuk mempublikasikan dan berlangganan pesan MQTT ke dan dari. AWS IoT Core

Otorisasi

Untuk menggunakan pesan AWS IoT Core MQTT dalam komponen kustom, Anda harus menentukan kebijakan otorisasi yang memungkinkan komponen Anda mengirim dan menerima pesan tentang topik. Untuk informasi tentang cara menentukan kebijakan otorisasi, lihat Otorisasi komponen untuk melakukan operasi IPC.

Kebijakan otorisasi untuk pesan AWS IoT Core MQTT memiliki properti berikut.

Pengenal layanan IPC: aws.greengrass.ipc.mqttproxy

Operasi Deskripsi Sumber daya

aws.greengrass#PublishToIoTCore

Memungkinkan komponen untuk mempublikasikan pesan AWS IoT Core pada topik MQTT yang Anda tentukan.

Topik string, seperti test/topic, atau * untuk mengizinkan akses ke semua topik. Anda dapat menggunakan wildcard topik MQTT (#dan+) untuk mencocokkan beberapa sumber daya.

aws.greengrass#SubscribeToIoTCore

Memungkinkan komponen untuk berlangganan pesan dari AWS IoT Core topik yang Anda tentukan.

Topik string, seperti test/topic, atau * untuk mengizinkan akses ke semua topik. Anda dapat menggunakan wildcard topik MQTT (#dan+) untuk mencocokkan beberapa sumber daya.

*

Memungkinkan komponen untuk mempublikasikan dan berlangganan pesan AWS IoT Core MQTT untuk topik yang Anda tentukan.

Topik string, seperti test/topic, atau * untuk mengizinkan akses ke semua topik. Anda dapat menggunakan wildcard topik MQTT (#dan+) untuk mencocokkan beberapa sumber daya.

Wildcard MQTT dalam kebijakan otorisasi MQTT AWS IoT Core

Anda dapat menggunakan wildcard MQTT dalam kebijakan otorisasi AWS IoT Core MQTT IPC. Komponen dapat mempublikasikan dan berlangganan topik yang cocok dengan filter topik yang Anda izinkan dalam kebijakan otorisasi. Misalnya, jika kebijakan otorisasi komponen memberikan akses ketest/topic/#, komponen dapat berlangganantest/topic/#, dan dapat mempublikasikan dan berlangganan. test/topic/filter

Variabel resep dalam kebijakan AWS IoT Core otorisasi MQTT

Jika Anda menggunakan v2.6.0 atau yang lebih baru dari inti Greengrass, Anda dapat menggunakan variabel resep dalam kebijakan otorisasi. {iot:thingName} Fitur ini memungkinkan Anda mengonfigurasi kebijakan otorisasi tunggal untuk sekelompok perangkat inti, di mana setiap perangkat inti hanya dapat mengakses topik yang berisi namanya sendiri. Misalnya, Anda dapat mengizinkan akses komponen ke sumber daya topik berikut.

devices/{iot:thingName}/messages

Untuk informasi selengkapnya, lihat Variabel resep dan Gunakan variabel resep dalam menggabungkan pembaruan.

Contoh kebijakan otorisasi

Anda dapat mereferensikan contoh kebijakan otorisasi berikut untuk membantu Anda mengonfigurasi kebijakan otorisasi untuk komponen Anda.

contoh Contoh kebijakan otorisasi dengan akses tidak terbatas

Kebijakan otorisasi contoh berikut ini memungkinkan komponen untuk mempublikasikan dan berlangganan semua topik.

JSON
{ "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:mqttproxy:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "*" ] } } } }
YAML
--- accessControl: aws.greengrass.ipc.mqttproxy: com.example.MyIoTCorePubSubComponent:mqttproxy:1: policyDescription: Allows access to publish/subscribe to all topics. operations: - aws.greengrass#PublishToIoTCore - aws.greengrass#SubscribeToIoTCore resources: - "*"
contoh Contoh kebijakan otorisasi dengan akses terbatas

Contoh kebijakan otorisasi berikut memungkinkan komponen untuk menerbitkan dan berlangganan dua topik bernama factory/1/events danfactory/1/actions.

JSON
{ "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:mqttproxy:1": { "policyDescription": "Allows access to publish/subscribe to factory 1 topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "factory/1/actions", "factory/1/events" ] } } } }
YAML
--- accessControl: aws.greengrass.ipc.mqttproxy: "com.example.MyIoTCorePubSubComponent:mqttproxy:1": policyDescription: Allows access to publish/subscribe to factory 1 topics. operations: - aws.greengrass#PublishToIoTCore - aws.greengrass#SubscribeToIoTCore resources: - factory/1/actions - factory/1/events
contoh Contoh kebijakan otorisasi untuk sekelompok perangkat inti
penting

Contoh ini menggunakan fitur yang tersedia untuk v2.6.0 dan yang lebih baru dari komponen inti Greengrass. Greengrass nucleus v2.6.0 menambahkan dukungan untuk sebagian besar variabel resep, seperti, dalam konfigurasi komponen. {iot:thingName}

Contoh kebijakan otorisasi berikut memungkinkan komponen untuk menerbitkan dan berlangganan topik yang berisi nama perangkat inti yang menjalankan komponen.

JSON
{ "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:mqttproxy:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "factory/1/devices/{iot:thingName}/controls" ] } } } }
YAML
--- accessControl: aws.greengrass.ipc.mqttproxy: "com.example.MyIoTCorePubSubComponent:mqttproxy:1": policyDescription: Allows access to publish/subscribe to all topics. operations: - aws.greengrass#PublishToIoTCore - aws.greengrass#SubscribeToIoTCore resources: - factory/1/devices/{iot:thingName}/controls

PublishToIoTCore

Menerbitkan pesan MQTT ke AWS IoT Core topik.

Saat Anda mempublikasikan pesan MQTT ke AWS IoT Core, ada kuota 100 transaksi per detik. Jika Anda melebihi kuota ini, pesan akan diantrian untuk diproses di perangkat Greengrass. Ada juga kuota 512 Kb data per detik dan kuota akun sebesar 20.000 publikasi per detik (2.000 di beberapa). Wilayah AWS Untuk informasi selengkapnya tentang batas broker pesan MQTT AWS IoT Core, lihat batas dan AWS IoT Core kuota broker pesan dan protokol.

Jika Anda melebihi kuota ini, perangkat Greengrass membatasi penerbitan pesan. AWS IoT Core Pesan disimpan dalam spooler di memori. Secara default, memori yang dialokasikan ke spooler adalah 2,5 Mb. Jika spooler terisi, pesan baru ditolak. Anda dapat meningkatkan ukuran spooler. Untuk informasi lebih lanjut, lihat Konfigurasi di Inti Greengrass dokumentasi. Untuk menghindari pengisian spooler dan perlu menambah memori yang dialokasikan, batasi permintaan publikasi tidak lebih dari 100 permintaan per detik.

Ketika aplikasi Anda perlu mengirim pesan pada tingkat yang lebih tinggi, atau pesan yang lebih besar, pertimbangkan untuk menggunakan pesan Manajer pengaliran untuk mengirim pesan ke Kinesis Data Streams. Komponen manajer aliran dirancang untuk mentransfer data volume tinggi ke file. AWS Cloud Untuk informasi selengkapnya, lihat Kelola aliran data di perangkat inti Greengrass.

Permintaan

Permintaan operasi ini memiliki parameter berikut:

topicName(Python:) topic_name

Topik untuk mempublikasikan pesan.

qos

QoS MQTT yang akan digunakan. Enum ini, QOS, memiliki nilai-nilai berikut:

  • AT_MOST_ONCE – QoS 0. Pesan MQTT dikirim paling banyak sekali.

  • AT_LEAST_ONCE – QoS 1. Pesan MQTT dikirim paling sedikit sekali.

payload

(Opsional) Pesan muatan sebagai gumpalan.

Fitur-fitur berikut tersedia untuk v2.10.0 dan yang lebih baru Inti Greengrass saat menggunakan MQTT 5. Fitur-fitur ini diabaikan saat Anda menggunakan MQTT 3.1.1. Tabel berikut mencantumkan versi minimum SDK AWS IoT perangkat yang harus Anda gunakan untuk mengakses fitur ini.

payloadFormat

(Opsional) Format payload pesan. Jika Anda tidak menyetelpayloadFormat, tipenya diasumsikanBYTES. Enum memiliki nilai-nilai berikut:

  • BYTES— Isi muatan adalah gumpalan biner.

  • UTF8— Isi payload adalah string karakter UTF8.

retain

(Opsional) Menunjukkan apakah akan menyetel opsi penyimpanan MQTT saat menerbitkan. true

userProperties

(Opsional) Daftar UserProperty objek khusus aplikasi untuk dikirim. UserPropertyObjek didefinisikan sebagai berikut:

UserProperty: key: string value: string
messageExpiryIntervalSeconds

(Opsional) Jumlah detik sebelum pesan berakhir dan dihapus oleh server. Jika nilai ini tidak disetel, pesan tidak kedaluwarsa.

correlationData

(Opsional) Informasi ditambahkan ke permintaan yang dapat digunakan untuk mengaitkan permintaan dengan respons.

responseTopic

(Opsional) Topik yang harus digunakan untuk pesan respons.

contentType

(Opsional) Pengidentifikasi khusus aplikasi dari jenis konten pesan.

Respons

Operasi ini tidak memberikan informasi apa pun dalam tanggapannya.

Contoh

Contoh-contoh berikut ini menunjukkan cara memanggil operasi ini dalam kode komponen kustom.

Java (IPC client V2)
contoh Contoh: Publikasikan pesan
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest; import software.amazon.awssdk.aws.greengrass.model.QOS; import java.nio.charset.StandardCharsets; public class PublishToIoTCore { public static void main(String[] args) { String topic = args[0]; String message = args[1]; QOS qos = QOS.get(args[2]); try (GreengrassCoreIPCClientV2 ipcClientV2 = GreengrassCoreIPCClientV2.builder().build()) { ipcClientV2.publishToIoTCore(new PublishToIoTCoreRequest() .withTopicName(topic) .withPayload(message.getBytes(StandardCharsets.UTF_8)) .withQos(qos)); System.out.println("Successfully published to topic: " + topic); } catch (Exception e) { System.err.println("Exception occurred."); e.printStackTrace(); System.exit(1); } } }
Python (IPC client V2)
contoh Contoh: Publikasikan pesan
catatan

Contoh ini mengasumsikan bahwa Anda menggunakan versi 1.5.4 atau yang lebih baru untuk AWS IoT Device SDK Python v2.

import awsiot.greengrasscoreipc.clientv2 as clientV2 topic = 'my/topic' qos = '1' payload = 'Hello, World' ipc_client = clientV2.GreengrassCoreIPCClientV2() resp = ipc_client.publish_to_iot_core(topic_name=topic, qos=qos, payload=payload) ipc_client.close()
Java (IPC client V1)
contoh Contoh: Publikasikan pesan
catatan

Contoh ini menggunakan IPCUtils kelas untuk membuat koneksi ke layanan AWS IoT Greengrass Core IPC. Untuk informasi selengkapnya, lihat Connect ke layanan AWS IoT Greengrass Core IPC.

package com.aws.greengrass.docs.samples.ipc; import com.aws.greengrass.docs.samples.ipc.util.IPCUtils; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.PublishToIoTCoreResponseHandler; import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest; import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreResponse; import software.amazon.awssdk.aws.greengrass.model.QOS; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PublishToIoTCore { public static final int TIMEOUT_SECONDS = 10; public static void main(String[] args) { String topic = args[0]; String message = args[1]; QOS qos = QOS.get(args[2]); try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); PublishToIoTCoreResponseHandler responseHandler = PublishToIoTCore.publishBinaryMessageToTopic(ipcClient, topic, message, qos); CompletableFuture<PublishToIoTCoreResponse> futureResponse = responseHandler.getResponse(); try { futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); System.out.println("Successfully published to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while publishing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { throw e; } } } catch (InterruptedException e) { System.out.println("IPC interrupted."); } catch (ExecutionException e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } public static PublishToIoTCoreResponseHandler publishBinaryMessageToTopic(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, String message, QOS qos) { PublishToIoTCoreRequest publishToIoTCoreRequest = new PublishToIoTCoreRequest(); publishToIoTCoreRequest.setTopicName(topic); publishToIoTCoreRequest.setPayload(message.getBytes(StandardCharsets.UTF_8)); publishToIoTCoreRequest.setQos(qos); return greengrassCoreIPCClient.publishToIoTCore(publishToIoTCoreRequest, Optional.empty()); } }
Python (IPC client V1)
contoh Contoh: Publikasikan pesan
catatan

Contoh ini mengasumsikan bahwa Anda menggunakan versi 1.5.4 atau yang lebih baru untuk AWS IoT Device SDK Python v2.

import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( QOS, PublishToIoTCoreRequest ) TIMEOUT = 10 ipc_client = awsiot.greengrasscoreipc.connect() topic = "my/topic" message = "Hello, World" qos = QOS.AT_LEAST_ONCE request = PublishToIoTCoreRequest() request.topic_name = topic request.payload = bytes(message, "utf-8") request.qos = qos operation = ipc_client.new_publish_to_iot_core() operation.activate(request) future_response = operation.get_response() future_response.result(TIMEOUT)
C++
contoh Contoh: Publikasikan pesan
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String message("Hello, World!"); String topic("my/topic"); QOS qos = QOS_AT_MOST_ONCE; int timeout = 10; PublishToIoTCoreRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); request.SetTopicName(topic); request.SetPayload(messageData); request.SetQos(qos); auto operation = ipcClient.NewPublishToIoTCore(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } } return 0; }
JavaScript
contoh Contoh: Publikasikan pesan
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {QOS, PublishToIoTCoreRequest} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; class PublishToIoTCore { private ipcClient: greengrasscoreipc.Client private readonly topic: string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.publishToIoTCore().then(r => console.log("Started workflow")); } private async publishToIoTCore() { try { const request: PublishToIoTCoreRequest = { topicName: this.topic, qos: QOS.AT_LEAST_ONCE, // you can change this depending on your use case } this.ipcClient = await getIpcClient(); await this.ipcClient.publishToIoTCore(request); } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const publishToIoTCore = new PublishToIoTCore();

SubscribeToIoTCore

Berlangganan pesan MQTT dari AWS IoT Core topik atau filter topik. Perangkat lunak AWS IoT Greengrass Core menghapus langganan saat komponen mencapai akhir siklus hidupnya.

Operasi ini adalah operasi berlangganan di mana Anda berlangganan aliran pesan peristiwa. Untuk menggunakan operasi ini, tentukan bagian yang menangani respons aliran dengan fungsi yang menangani pesan peristiwa, kesalahan, dan penutupan aliran. Untuk informasi selengkapnya, lihat Berlangganan pengaliran peristiwa IPC.

Jenis pesan peristiwa: IoTCoreMessage

Permintaan

Permintaan operasi ini memiliki parameter berikut:

topicName(Python:) topic_name

Topik yang harus dijadikan langganan. Anda dapat menggunakan wildcard topik MQTT (# dan +) untuk berlangganan beberapa topik.

qos

QoS MQTT yang akan digunakan. Enum ini, QOS, memiliki nilai-nilai berikut:

  • AT_MOST_ONCE – QoS 0. Pesan MQTT dikirim paling banyak sekali.

  • AT_LEAST_ONCE – QoS 1. Pesan MQTT dikirim paling sedikit sekali.

Respons

Tanggapan operasi ini memiliki informasi berikut:

messages

Aliran pesan MQTT. Objek ini, IoTCoreMessage, berisi informasi berikut:

message

Pesan MQTT. Objek ini, MQTTMessage, berisi informasi berikut:

topicName(Python:) topic_name

Topik yang pesannya dipublikasikan.

payload

(Opsional) Pesan muatan sebagai gumpalan.

Fitur-fitur berikut tersedia untuk v2.10.0 dan yang lebih baru Inti Greengrass saat menggunakan MQTT 5. Fitur-fitur ini diabaikan saat Anda menggunakan MQTT 3.1.1. Tabel berikut mencantumkan versi minimum SDK AWS IoT perangkat yang harus Anda gunakan untuk mengakses fitur ini.

payloadFormat

(Opsional) Format payload pesan. Jika Anda tidak menyetelpayloadFormat, tipenya diasumsikanBYTES. Enum memiliki nilai-nilai berikut:

  • BYTES— Isi muatan adalah gumpalan biner.

  • UTF8— Isi payload adalah string karakter UTF8.

retain

(Opsional) Menunjukkan apakah akan menyetel opsi penyimpanan MQTT saat menerbitkan. true

userProperties

(Opsional) Daftar UserProperty objek khusus aplikasi untuk dikirim. UserPropertyObjek didefinisikan sebagai berikut:

UserProperty: key: string value: string
messageExpiryIntervalSeconds

(Opsional) Jumlah detik sebelum pesan berakhir dan dihapus oleh server. Jika nilai ini tidak disetel, pesan tidak kedaluwarsa.

correlationData

(Opsional) Informasi ditambahkan ke permintaan yang dapat digunakan untuk mengaitkan permintaan dengan respons.

responseTopic

(Opsional) Topik yang harus digunakan untuk pesan respons.

contentType

(Opsional) Pengidentifikasi khusus aplikasi dari jenis konten pesan.

Contoh

Contoh-contoh berikut ini menunjukkan cara memanggil operasi ini dalam kode komponen kustom.

Java (IPC client V2)
contoh Contoh: Berlangganan pesan
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToIoTCoreResponseHandler; import software.amazon.awssdk.aws.greengrass.model.QOS; import software.amazon.awssdk.aws.greengrass.model.IoTCoreMessage; import software.amazon.awssdk.aws.greengrass.model.SubscribeToIoTCoreRequest; import software.amazon.awssdk.aws.greengrass.model.SubscribeToIoTCoreResponse; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; public class SubscribeToIoTCore { public static void main(String[] args) { String topic = args[0]; QOS qos = QOS.get(args[1]); Consumer<IoTCoreMessage> onStreamEvent = ioTCoreMessage -> System.out.printf("Received new message on topic %s: %s%n", ioTCoreMessage.getMessage().getTopicName(), new String(ioTCoreMessage.getMessage().getPayload(), StandardCharsets.UTF_8)); Optional<Function<Throwable, Boolean>> onStreamError = Optional.of(e -> { System.err.println("Received a stream error."); e.printStackTrace(); return false; }); Optional<Runnable> onStreamClosed = Optional.of(() -> System.out.println("Subscribe to IoT Core stream closed.")); try (GreengrassCoreIPCClientV2 ipcClientV2 = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToIoTCoreRequest request = new SubscribeToIoTCoreRequest() .withTopicName(topic) .withQos(qos); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToIoTCoreResponse, SubscribeToIoTCoreResponseHandler> streamingResponse = ipcClientV2.subscribeToIoTCore(request, onStreamEvent, onStreamError, onStreamClosed); streamingResponse.getResponse(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. while (true) { Thread.sleep(10000); } // To stop subscribing, close the stream. streamingResponse.getHandler().closeStream(); } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } catch (Exception e) { System.err.println("Exception occurred."); e.printStackTrace(); System.exit(1); } } }
Python (IPC client V2)
contoh Contoh: berlangganan pesan
catatan

Contoh ini mengasumsikan bahwa Anda menggunakan versi 1.5.4 atau yang lebih baru untuk AWS IoT Device SDK Python v2.

import threading import traceback import awsiot.greengrasscoreipc.clientv2 as clientV2 topic = 'my/topic' qos = '1' def on_stream_event(event): try: topic_name = event.message.topic_name message = str(event.message.payload, 'utf-8') print(f'Received new message on topic {topic_name}: {message}') except: traceback.print_exc() def on_stream_error(error): # Return True to close stream, False to keep stream open. return True def on_stream_closed(): pass ipc_client = clientV2.GreengrassCoreIPCClientV2() resp, operation = ipc_client.subscribe_to_iot_core( topic_name=topic, qos=qos, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed ) # Keep the main thread alive, or the process will exit. event = threading.Event() event.wait() # To stop subscribing, close the operation stream. operation.close() ipc_client.close()
Java (IPC client V1)
contoh Contoh: Berlangganan pesan
catatan

Contoh ini menggunakan IPCUtils kelas untuk membuat koneksi ke layanan AWS IoT Greengrass Core IPC. Untuk informasi selengkapnya, lihat Connect ke layanan AWS IoT Greengrass Core IPC.

package com.aws.greengrass.docs.samples.ipc; import com.aws.greengrass.docs.samples.ipc.util.IPCUtils; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.SubscribeToIoTCoreResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class SubscribeToIoTCore { public static final int TIMEOUT_SECONDS = 10; public static void main(String[] args) { String topic = args[0]; QOS qos = QOS.get(args[1]); try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); StreamResponseHandler<IoTCoreMessage> streamResponseHandler = new SubscriptionResponseHandler(); SubscribeToIoTCoreResponseHandler responseHandler = SubscribeToIoTCore.subscribeToIoTCore(ipcClient, topic, qos, streamResponseHandler); CompletableFuture<SubscribeToIoTCoreResponse> futureResponse = responseHandler.getResponse(); try { futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); System.out.println("Successfully subscribed to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while subscribing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while subscribing to topic: " + topic); } else { throw e; } } // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (InterruptedException e) { System.out.println("IPC interrupted."); } catch (ExecutionException e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } public static SubscribeToIoTCoreResponseHandler subscribeToIoTCore(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, QOS qos, StreamResponseHandler<IoTCoreMessage> streamResponseHandler) { SubscribeToIoTCoreRequest subscribeToIoTCoreRequest = new SubscribeToIoTCoreRequest(); subscribeToIoTCoreRequest.setTopicName(topic); subscribeToIoTCoreRequest.setQos(qos); return greengrassCoreIPCClient.subscribeToIoTCore(subscribeToIoTCoreRequest, Optional.of(streamResponseHandler)); } public static class SubscriptionResponseHandler implements StreamResponseHandler<IoTCoreMessage> { @Override public void onStreamEvent(IoTCoreMessage ioTCoreMessage) { try { String topic = ioTCoreMessage.getMessage().getTopicName(); String message = new String(ioTCoreMessage.getMessage().getPayload(), StandardCharsets.UTF_8); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } @Override public boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; } @Override public void onStreamClosed() { System.out.println("Subscribe to IoT Core stream closed."); } } }
Python (IPC client V1)
contoh Contoh: Berlangganan pesan
catatan

Contoh ini mengasumsikan bahwa Anda menggunakan versi 1.5.4 atau yang lebih baru untuk AWS IoT Device SDK Python v2.

import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( IoTCoreMessage, QOS, SubscribeToIoTCoreRequest ) TIMEOUT = 10 ipc_client = awsiot.greengrasscoreipc.connect() class StreamHandler(client.SubscribeToIoTCoreStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: IoTCoreMessage) -> None: try: message = str(event.message.payload, "utf-8") topic_name = event.message.topic_name # Handle message. except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: # Handle error. return True # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: # Handle close. pass topic = "my/topic" qos = QOS.AT_MOST_ONCE request = SubscribeToIoTCoreRequest() request.topic_name = topic request.qos = qos handler = StreamHandler() operation = ipc_client.new_subscribe_to_iot_core(handler) operation.activate(request) future_response = operation.get_response() future_response.result(TIMEOUT) # Keep the main thread alive, or the process will exit. while True: time.sleep(10) # To stop subscribing, close the operation stream. operation.close()
C++
contoh Contoh: Berlangganan pesan
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler { public: virtual ~IoTCoreResponseHandler() {} private: void OnStreamEvent(IoTCoreMessage *response) override { auto message = response->GetMessage(); if (message.has_value() && message.value().GetPayload().has_value()) { auto messageBytes = message.value().GetPayload().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::string topicName = message.value().GetTopicName().value().c_str(); // Handle message. } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); QOS qos = QOS_AT_MOST_ONCE; int timeout = 10; SubscribeToIoTCoreRequest request; request.SetTopicName(topic); request.SetQos(qos); auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
JavaScript
contoh Contoh: Berlangganan pesan
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {IoTCoreMessage, QOS, SubscribeToIoTCoreRequest} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc"; class SubscribeToIoTCore { private ipcClient: greengrasscoreipc.Client private readonly topic: string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.subscribeToIoTCore().then(r => console.log("Started workflow")); } private async subscribeToIoTCore() { try { const request: SubscribeToIoTCoreRequest = { topicName: this.topic, qos: QOS.AT_LEAST_ONCE, // you can change this depending on your use case } this.ipcClient = await getIpcClient(); const streamingOperation = this.ipcClient.subscribeToIoTCore(request); streamingOperation.on('message', (message: IoTCoreMessage) => { // parse the message depending on your use cases, e.g. if (message.message && message.message.payload) { const receivedMessage = message.message.payload.toString(); } }); streamingOperation.on('streamError', (error : RpcError) => { // define your own error handling logic }); streamingOperation.on('ended', () => { // define your own logic }); await streamingOperation.activate(); // Keep the main thread alive, or the process will exit. await new Promise((resolve) => setTimeout(resolve, 10000)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const subscribeToIoTCore = new SubscribeToIoTCore();

Contoh

Gunakan contoh berikut untuk mempelajari cara menggunakan layanan AWS IoT Core MQTT IPC di komponen Anda.

Contoh resep berikut memungkinkan komponen untuk mempublikasikan ke semua topik.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCorePublisherCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes MQTT messages to IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCorePublisherCpp:mqttproxy:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "run": "{artifacts:path}/greengrassv2_iotcore_publisher" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.IoTCorePublisherCpp/1.0.0/greengrassv2_iotcore_publisher", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.IoTCorePublisherCpp ComponentVersion: 1.0.0 ComponentDescription: A component that publishes MQTT messages to IoT Core. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.mqttproxy: com.example.IoTCorePublisherCpp:mqttproxy:1: policyDescription: Allows access to publish to all topics. operations: - aws.greengrass#PublishToIoTCore resources: - "*" Manifests: - Lifecycle: run: "{artifacts:path}/greengrassv2_iotcore_publisher" Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.IoTCorePublisherCpp/1.0.0/greengrassv2_iotcore_publisher Permission: Execute: OWNER

Contoh berikut aplikasi C++ menunjukkan bagaimana menggunakan layanan AWS IoT Core MQTT IPC untuk mempublikasikan pesan ke. AWS IoT Core

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String message("Hello from the Greengrass IPC MQTT publisher (C++)."); String topic("test/topic/cpp"); QOS qos = QOS_AT_LEAST_ONCE; int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } while (true) { PublishToIoTCoreRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); request.SetTopicName(topic); request.SetPayload(messageData); request.SetQos(qos); auto operation = ipcClient.NewPublishToIoTCore(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } std::this_thread::sleep_for(std::chrono::seconds(5)); } return 0; }

Contoh resep berikut memungkinkan komponen untuk berlangganan semua topik.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCoreSubscriberCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to MQTT messages from IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCoreSubscriberCpp:mqttproxy:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "run": "{artifacts:path}/greengrassv2_iotcore_subscriber" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/greengrassv2_iotcore_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.IoTCoreSubscriberCpp ComponentVersion: 1.0.0 ComponentDescription: A component that subscribes to MQTT messages from IoT Core. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.mqttproxy: com.example.IoTCoreSubscriberCpp:mqttproxy:1: policyDescription: Allows access to subscribe to all topics. operations: - aws.greengrass#SubscribeToIoTCore resources: - "*" Manifests: - Lifecycle: run: "{artifacts:path}/greengrassv2_iotcore_subscriber" Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/greengrassv2_iotcore_subscriber Permission: Execute: OWNER

Contoh berikut aplikasi C++ menunjukkan bagaimana menggunakan layanan AWS IoT Core MQTT IPC untuk berlangganan pesan dari. AWS IoT Core

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler { public: virtual ~IoTCoreResponseHandler() {} private: void OnStreamEvent(IoTCoreMessage *response) override { auto message = response->GetMessage(); if (message.has_value() && message.value().GetPayload().has_value()) { auto messageBytes = message.value().GetPayload().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::string messageTopic = message.value().GetTopicName().value().c_str(); std::cout << "Received new message on topic: " << messageTopic << std::endl; std::cout << "Message: " << messageString << std::endl; } } bool OnStreamError(OperationError *error) override { std::cout << "Received an operation error: "; if (error->GetMessage().has_value()) { std::cout << error->GetMessage().value(); } std::cout << std::endl; return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { std::cout << "Subscribe to IoT Core stream closed." << std::endl; } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String topic("test/topic/cpp"); QOS qos = QOS_AT_LEAST_ONCE; int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } SubscribeToIoTCoreRequest request; request.SetTopicName(topic); request.SetQos(qos); auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully subscribed to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to subscribe to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }