Gunakan AWS IoT Device SDK untuk berkomunikasi dengan inti Greengrass, komponen lain, dan AWS IoT Core - AWS IoT Greengrass

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Gunakan AWS IoT Device SDK untuk berkomunikasi dengan inti Greengrass, komponen lain, dan AWS IoT Core

Komponen yang berjalan pada perangkat inti Anda dapat menggunakan pustaka komunikasi interproses AWS IoT Greengrass Inti (IPC) di dalam AWS IoT Device SDK untuk berkomunikasi dengan AWS IoT Greengrass inti dan komponen Greengrass lainnya. Untuk mengembangkan dan menjalankan komponen khusus yang menggunakan IPC, Anda harus menggunakan AWS IoT Device SDK untuk terhubung ke layanan AWS IoT Greengrass Core IPC dan melakukan operasi IPC.

Antarmuka IPC mendukung dua jenis operasi:

  • Permintaan/tanggapan

    Komponen mengirim permintaan ke layanan IPC dan menerima respons yang berisi hasil permintaan.

  • Berlangganan

    Komponen mengirim permintaan berlangganan ke layanan IPC dan mengharapkan aliran pesan peristiwa sebagai tanggapan. Komponen menyediakan bagian yang menangani langganan yang menangani pesan peristiwa, kesalahan, dan penutupan aliran. AWS IoT Device SDK Termasuk antarmuka handler dengan respon yang benar dan jenis peristiwa untuk setiap operasi IPC. Untuk informasi selengkapnya, lihat Berlangganan pengaliran peristiwa IPC.

Versi klien IPC

Dalam versi yang lebih baru dari Java dan Python SDK, AWS IoT Greengrass menyediakan versi perbaikan dari klien IPC, yang disebut IPC client V2. Klien IPC V2:

  • Mengurangi jumlah kode yang perlu Anda tulis untuk menggunakan operasi IPC dan membantu menghindari kesalahan umum yang dapat terjadi dengan klien IPC V1.

  • Memanggil callback handler langganan di thread terpisah, sehingga Anda sekarang dapat menjalankan kode pemblokiran, termasuk panggilan fungsi IPC tambahan, dalam callback handler langganan. Klien IPC V1 menggunakan thread yang sama untuk berkomunikasi dengan server IPC dan memanggil callback handler langganan.

  • Memungkinkan Anda memanggil operasi berlangganan menggunakan ekspresi Lambda (Java) atau fungsi (Python). Klien IPC V1 mengharuskan Anda untuk menentukan kelas handler berlangganan.

  • Menyediakan versi sinkron dan asinkron dari setiap operasi IPC. Klien IPC V1 hanya menyediakan versi asinkron dari setiap operasi.

Kami menyarankan Anda menggunakan klien IPC V2 untuk memanfaatkan peningkatan ini. Namun, banyak contoh dalam dokumentasi ini dan dalam beberapa konten online hanya menunjukkan bagaimana menggunakan klien IPC V1. Anda dapat menggunakan contoh dan tutorial berikut untuk melihat komponen sampel yang menggunakan klien IPC V2:

Saat ini, AWS IoT Device SDK untuk C++ v2 hanya mendukung klien IPC V1.

SDK yang didukung untuk komunikasi antar proses

Pustaka AWS IoT Greengrass Core IPC disertakan dalam versi berikut AWS IoT Device SDK .

Connect ke layanan AWS IoT Greengrass Core IPC

Untuk menggunakan komunikasi interprocess dalam komponen kustom Anda, Anda harus membuat koneksi ke soket server IPC yang dijalankan oleh perangkat lunak AWS IoT Greengrass Core. Selesaikan tugas-tugas berikut untuk mengunduh dan menggunakan AWS IoT Device SDK dalam bahasa pilihan Anda.

Untuk menggunakan AWS IoT Device SDK untuk Java v2 (klien IPC V2)
  1. Unduh AWS IoT Device SDK untuk Java v2 (v1.6.0 atau yang lebih baru).

  2. Lakukan salah satu dari berikut ini untuk menjalankan kode kustom Anda dalam komponen Anda:

    • Bangun komponen Anda sebagai file JAR yang menyertakan AWS IoT Device SDK, dan jalankan file JAR ini dalam resep komponen Anda.

    • Tentukan AWS IoT Device SDK JAR sebagai artefak komponen, dan tambahkan artefak itu ke classpath saat Anda menjalankan aplikasi dalam resep komponen Anda.

  3. Gunakan kode berikut untuk membuat klien IPC.

    try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { // Use client. } catch (Exception e) { LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e); System.exit(1); }
Untuk menggunakan AWS IoT Device SDK untuk Python v2 (klien IPC V2)
  1. Unduh AWS IoT Device SDK untuk Python (v1.9.0 atau yang lebih baru).

  2. Tambahkan langkah instalasi SDK ke siklus hidup instalasi dalam resep komponen Anda.

  3. Buat koneksi ke layanan AWS IoT Greengrass Core IPC. Gunakan kode berikut untuk membuat klien IPC.

    from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 try: ipc_client = GreengrassCoreIPCClientV2() # Use IPC client. except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

Untuk membangun AWS IoT Device SDK v2 untuk C ++, perangkat harus memiliki alat berikut:

  • C++ 11 atau yang lebih baru

  • CMake 3.1 atau yang lebih baru

  • Salah satu penyusun berikut:

    • GCC 4.8 atau yang lebih baru

    • Clang 3.9 atau yang lebih baru

    • MSVC 2015 atau yang lebih baru

Untuk menggunakan AWS IoT Device SDK untuk C++ v2
  1. Unduh AWS IoT Device SDK untuk C++ v2 (v1.17.0 atau yang lebih baru).

  2. Ikuti petunjuk penginstalan di README untuk membangun C++ v2 dari sumber. AWS IoT Device SDK

  3. Dalam alat bangun C ++ Anda, tautkan pustaka IPC Greengrass, AWS::GreengrassIpc-cpp, yang telah Anda bangun pada langkah sebelumnya. Contoh CMakeLists.txt berikut menautkan pustaka IPC Greengrass untuk proyek yang Anda bangun dengan CMake.

    cmake_minimum_required(VERSION 3.1) project (greengrassv2_pubsub_subscriber) file(GLOB MAIN_SRC "*.h" "*.cpp" ) add_executable(${PROJECT_NAME} ${MAIN_SRC}) set_target_properties(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX CXX_STANDARD 11) find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build) find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build) find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build) target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
  4. Dalam kode komponen Anda, buat koneksi ke layanan AWS IoT Greengrass Core IPC untuk membuat klien IPC ()Aws::Greengrass::GreengrassCoreIpcClient. Anda harus menentukan pengelola siklus hidup koneksi IPC yang menangani koneksi IPC, pemutusan, dan peristiwa kesalahan. Contoh berikut membuat klien IPC dan pengelola siklus hidup koneksi IPC yang mencetak ketika klien IPC terhubung, terputus, dan menemukan kesalahan.

    #include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { // Create the IPC client. ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } // Use the IPC client to create an operation request. // Activate the operation request. auto activate = operation.Activate(request, nullptr); activate.wait(); // Wait for Greengrass Core to respond to the request. auto responseFuture = operation.GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } // Check the result of the request. auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } return 0; }
  5. Untuk menjalankan kode kustom Anda dalam komponen Anda, bangun kode Anda sebagai artefak biner, dan jalankan artefak biner dalam resep komponen Anda. Atur Execute izin artefak OWNER untuk mengaktifkan perangkat lunak AWS IoT Greengrass Core menjalankan artefak biner.

    Bagian Manifests resep komponen Anda ini mungkin terlihat serupa dengan yang berikut ini.

    JSON
    { ... "Manifests": [ { "Lifecycle": { "run": "{artifacts:path}/greengrassv2_pubsub_subscriber" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
    YAML
    ... Manifests: - Lifecycle: run: {artifacts:path}/greengrassv2_pubsub_subscriber Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber Permission: Execute: OWNER

Untuk membangun AWS IoT Device SDK for JavaScript v2 untuk digunakan dengan NodeJS, perangkat harus memiliki alat berikut:

  • NodeJS 10.0 atau yang lebih baru

    • Jalankan node -v untuk memeriksa versi Node.

  • CMake 3.1 atau yang lebih baru

Untuk menggunakan AWS IoT Device SDK for JavaScript v2 (klien IPC V1)
  1. Unduh AWS IoT Device SDK untuk JavaScript v2 (v1.12.10 atau yang lebih baru).

  2. Ikuti petunjuk penginstalan di README untuk membangun AWS IoT Device SDK for JavaScript v2 dari sumber.

  3. Buat koneksi ke layanan AWS IoT Greengrass Core IPC. Selesaikan langkah-langkah berikut untuk membuat klien IPC dan membuat sambungan.

  4. Gunakan kode berikut untuk membuat klien IPC.

    import * as greengrascoreipc from 'aws-iot-device-sdk-v2'; let client = greengrascoreipc.createClient();
  5. Gunakan kode berikut untuk membuat sambungan dari komponen Anda ke inti Greengrass.

    await client.connect();

Otorisasi komponen untuk melakukan operasi IPC

Untuk mengizinkan komponen kustom Anda untuk menggunakan beberapa operasi IPC, Anda harus menentukan kebijakan otorisasi yang memungkinkan komponen untuk melakukan operasi pada sumber daya tertentu. Setiap kebijakan otorisasi menentukan daftar operasi dan daftar sumber daya yang diizinkan oleh kebijakan. Sebagai contoh, layanan IPC olah pesan mempublikasikan/berlangganan menentukan operasi publikasi dan langganan untuk sumber daya topik. Anda dapat menentukan wildcard * untuk mengizinkan akses ke semua operasi atau semua sumber daya.

Anda menentukan kebijakan otorisasi dengan parameter accessControl konfigurasi, yang dapat Anda atur dalam resep komponen atau saat Anda menerapkan komponen. Objek accessControl memetakan pengidentifikasi layanan IPC pada daftar kebijakan otorisasi. Anda dapat menentukan beberapa kebijakan otorisasi untuk setiap layanan IPC untuk mengontrol akses. Setiap kebijakan otorisasi memiliki ID kebijakan, yang harus unik di antara semua komponen.

Tip

Untuk membuat ID kebijakan yang unik, Anda dapat menggabungkan nama komponen, nama layanan IPC, dan counter. Sebagai contoh, sebuah komponen bernama com.example.HelloWorld mungkin menentukan dua kebijakan otorisasi publikasi/berlangganan dengan ID berikut:

  • com.example.HelloWorld:pubsub:1

  • com.example.HelloWorld:pubsub:2

Kebijakan otorisasi menggunakan format berikut. Objek ini adalah parameter konfigurasi accessControl.

JSON
{ "IPC service identifier": { "policyId": { "policyDescription": "description", "operations": [ "operation1", "operation2" ], "resources": [ "resource1", "resource2" ] } } }
YAML
IPC service identifier: policyId: policyDescription: description operations: - operation1 - operation2 resources: - resource1 - resource2

Wildcard dalam kebijakan otorisasi

Anda dapat menggunakan * wildcard dalam resources elemen kebijakan otorisasi IPC untuk mengizinkan akses ke beberapa sumber daya dalam satu kebijakan otorisasi.

  • Di semua versi inti Greengrass, Anda dapat menentukan * satu karakter sebagai sumber daya untuk memungkinkan akses ke semua sumber daya.

  • Di Greengrass nucleus v2.6.0 dan yang lebih baru, Anda dapat menentukan karakter dalam sumber daya agar sesuai * dengan kombinasi karakter apa pun. Misalnya, Anda dapat menentukan factory/1/devices/Thermostat*/status untuk mengizinkan akses ke topik status untuk semua perangkat termostat di pabrik, tempat nama setiap perangkat dimulaiThermostat.

Saat menentukan kebijakan otorisasi untuk layanan AWS IoT Core MQTT IPC, Anda juga dapat menggunakan wildcard MQTT (dan) untuk mencocokkan beberapa sumber daya. + # Untuk informasi selengkapnya, lihat wildcard MQTT dalam kebijakan otorisasi AWS IoT Core MQTT IPC.

Variabel resep dalam kebijakan otorisasi

Jika Anda menggunakan Greengrass nucleus v2.6.0 atau yang lebih baru, dan Anda menyetel opsi konfigurasi Greengrass nucleus ke, Anda dapat menggunakan variabel resep dalam kebijakan interpolateComponentConfigurationotorisasi. true{iot:thingName} Bila Anda memerlukan kebijakan otorisasi yang menyertakan nama perangkat inti, seperti untuk topik MQTT atau bayangan perangkat, Anda dapat menggunakan variabel resep ini untuk mengonfigurasi kebijakan otorisasi tunggal untuk sekelompok perangkat inti. Misalnya, Anda dapat mengizinkan akses komponen ke sumber daya berikut untuk operasi IPC bayangan.

$aws/things/{iot:thingName}/shadow/

Karakter khusus dalam kebijakan otorisasi

Untuk menentukan literal * atau ? karakter dalam kebijakan otorisasi, Anda harus menggunakan urutan escape. Urutan escape berikut menginstruksikan perangkat lunak AWS IoT Greengrass Core untuk menggunakan nilai literal alih-alih makna khusus karakter. Misalnya, * karakter adalah wildcard yang cocok dengan kombinasi karakter apa pun.

Karakter literal Karakter pelarian Catatan

*

${*}

?

${?}

AWS IoT Greengrass saat ini tidak mendukung ? wildcard, yang cocok dengan karakter tunggal mana pun.

$

${$}

Gunakan urutan escape ini untuk mencocokkan sumber daya yang berisi${. Misalnya, untuk mencocokkan sumber daya bernama${resourceName}, Anda harus menentukan${$}{resourceName}. Jika tidak, untuk mencocokkan sumber daya yang berisi$, Anda dapat menggunakan literal$, seperti untuk mengizinkan akses ke topik yang dimulai dengan$aws.

Contoh kebijakan otorisasi

Anda dapat mereferensikan contoh kebijakan otorisasi berikut untuk membantu Anda mengonfigurasi kebijakan otorisasi untuk komponen Anda.

contoh Contoh resep komponen dengan kebijakan otorisasi

Contoh resep komponen berikut meliputi objek accessControl yang menentukan kebijakan otorisasi. Kebijakan ini mengotorisasi komponen com.example.HelloWorld untuk dipublikasikan ke topik test/topic.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.HelloWorld", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } }, "Manifests": [ { "Lifecycle": { "run": "java -jar {artifacts:path}/HelloWorld.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.HelloWorld ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: "com.example.HelloWorld:pubsub:1": policyDescription: Allows access to publish to test/topic. operations: - "aws.greengrass#PublishToTopic" resources: - "test/topic" Manifests: - Lifecycle: run: |- java -jar {artifacts:path}/HelloWorld.jar
contoh Contoh pembaruan konfigurasi komponen dengan kebijakan otorisasi

Contoh pemutakhiran konfigurasi berikut dalam penerapan menentukan untuk mengonfigurasi komponen dengan accessControl objek yang mendefinisikan kebijakan otorisasi. Kebijakan ini mengotorisasi komponen com.example.HelloWorld untuk dipublikasikan ke topik test/topic.

Console
Konfigurasi untuk digabungkan
{ "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } }
AWS CLI

Perintah berikut membuat penyebaran ke perangkat inti.

aws greengrassv2 create-deployment --cli-input-json file://hello-world-deployment.json

hello-world-deployment.jsonFile berisi dokumen JSON berikut.

{ "targetArn": "arn:aws:iot:us-west-2:123456789012:thing/MyGreengrassCore", "deploymentName": "Deployment for MyGreengrassCore", "components": { "com.example.HelloWorld": { "componentVersion": "1.0.0", "configurationUpdate": { "merge": "{\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"com.example.HelloWorld:pubsub:1\":{\"policyDescription\":\"Allows access to publish to test/topic.\",\"operations\":[\"aws.greengrass#PublishToTopic\"],\"resources\":[\"test/topic\"]}}}}" } } } }
Greengrass CLI

Perintah Greengrass CLI berikut membuat penerapan lokal pada perangkat inti.

sudo greengrass-cli deployment create \ --recipeDir recipes \ --artifactDir artifacts \ --merge "com.example.HelloWorld=1.0.0" \ --update-config hello-world-configuration.json

hello-world-configuration.jsonFile berisi dokumen JSON berikut.

{ "com.example.HelloWorld": { "MERGE": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } } }

Berlangganan pengaliran peristiwa IPC

Anda dapat menggunakan operasi IPC untuk berlangganan aliran peristiwa pada perangkat inti Greengrass. Untuk menggunakan operasi berlangganan, tentukan penanganan langganan dan buat permintaan ke layanan IPC. Kemudian, klien IPC menjalankan fungsi penanganan langganan setiap kali perangkat inti mengalirkan pesan peristiwa ke komponen Anda.

Anda dapat menutup suatu langganan untuk menghentikan pemrosesan pesan peristiwa. Untuk melakukannya, hubungi closeStream() (Java), close() (Python), atau Close() (C++) pada objek operasi langganan yang Anda gunakan untuk membuka langganan.

Layanan AWS IoT Greengrass Core IPC mendukung operasi berlangganan berikut:

Tentukan penanganan langganan

Untuk menentukan handler langganan, tentukan fungsi callback yang menangani pesan peristiwa, kesalahan, dan penutupan aliran. Jika Anda menggunakan klien IPC V1, Anda harus mendefinisikan fungsi-fungsi ini di kelas. Jika Anda menggunakan klien IPC V2, yang tersedia di versi SDK Java dan Python yang lebih baru, Anda dapat menentukan fungsi-fungsi ini tanpa membuat kelas handler berlangganan.

Java

Jika Anda menggunakan klien IPC V1, Anda harus mengimplementasikan antarmuka generiksoftware.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType>. StreamEventTypeadalah jenis pesan acara untuk operasi berlangganan. Tentukan fungsi berikut untuk menangani pesan peristiwa, kesalahan, dan penutupan aliran.

Jika Anda menggunakan klien IPC V2, Anda dapat menentukan fungsi-fungsi ini di luar kelas handler langganan atau menggunakan ekspresi lambda.

void onStreamEvent(StreamEventType event)

Callback yang dipanggil oleh klien IPC ketika menerima pesan peristiwa, seperti pesan MQTT atau notifikasi pembaruan komponen.

boolean onStreamError(Throwable error)

Callback yang dipanggil oleh klien IPC ketika terjadi kesalahan aliran.

Kembali BETUL untuk menutup aliran langganan sebagai akibat dari kesalahan, atau kembali SALAH untuk membuat aliran tetap terbuka.

void onStreamClosed()

Callback yang dipanggil oleh klien IPC ketika aliran menutup.

Python

Jika Anda menggunakan klien IPC V1, Anda harus memperluas kelas penangan respons aliran yang sesuai dengan operasi langganan. AWS IoT Device SDK Termasuk kelas handler berlangganan untuk setiap operasi berlangganan. StreamEventTypeadalah jenis pesan acara untuk operasi berlangganan. Tentukan fungsi berikut untuk menangani pesan peristiwa, kesalahan, dan penutupan aliran.

Jika Anda menggunakan klien IPC V2, Anda dapat menentukan fungsi-fungsi ini di luar kelas handler langganan atau menggunakan ekspresi lambda.

def on_stream_event(self, event: StreamEventType) -> None

Callback yang dipanggil oleh klien IPC ketika menerima pesan peristiwa, seperti pesan MQTT atau notifikasi pembaruan komponen.

def on_stream_error(self, error: Exception) -> bool

Callback yang dipanggil oleh klien IPC ketika terjadi kesalahan aliran.

Kembali BETUL untuk menutup aliran langganan sebagai akibat dari kesalahan, atau kembali SALAH untuk membuat aliran tetap terbuka.

def on_stream_closed(self) -> None

Callback yang dipanggil oleh klien IPC ketika aliran menutup.

C++

Terapkan kelas yang diturunkan dari penanganan respons aliran yang sesuai dengan operasi berlangganan. AWS IoT Device SDK Termasuk kelas dasar handler berlangganan untuk setiap operasi berlangganan. StreamEventTypeadalah jenis pesan acara untuk operasi berlangganan. Tentukan fungsi berikut untuk menangani pesan peristiwa, kesalahan, dan penutupan aliran.

void OnStreamEvent(StreamEventType *event)

Callback yang dipanggil oleh klien IPC ketika menerima pesan peristiwa, seperti pesan MQTT atau notifikasi pembaruan komponen.

bool OnStreamError(OperationError *error)

Callback yang dipanggil oleh klien IPC ketika terjadi kesalahan aliran.

Kembali BETUL untuk menutup aliran langganan sebagai akibat dari kesalahan, atau kembali SALAH untuk membuat aliran tetap terbuka.

void OnStreamClosed()

Callback yang dipanggil oleh klien IPC ketika aliran menutup.

JavaScript

Terapkan kelas yang diturunkan dari penanganan respons aliran yang sesuai dengan operasi berlangganan. AWS IoT Device SDK Termasuk kelas dasar handler berlangganan untuk setiap operasi berlangganan. StreamEventTypeadalah jenis pesan acara untuk operasi berlangganan. Tentukan fungsi berikut untuk menangani pesan peristiwa, kesalahan, dan penutupan aliran.

on(event: 'ended', listener: StreamingOperationEndedListener)

Callback yang dipanggil oleh klien IPC ketika aliran menutup.

on(event: 'streamError', listener: StreamingRpcErrorListener)

Callback yang dipanggil oleh klien IPC ketika terjadi kesalahan aliran.

Kembali BETUL untuk menutup aliran langganan sebagai akibat dari kesalahan, atau kembali SALAH untuk membuat aliran tetap terbuka.

on(event: 'message', listener: (message: InboundMessageType) => void)

Callback yang dipanggil oleh klien IPC ketika menerima pesan peristiwa, seperti pesan MQTT atau notifikasi pembaruan komponen.

Contoh penanganan langganan

Contoh berikut menunjukkan cara menggunakan operasi SubscribeToTopic dan penanganan langganan pada pesan publikasi/langganan lokal.

Java (IPC client V2)
contoh Contoh: Berlangganan pesan publikasi/langganan lokal
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import java.nio.charset.StandardCharsets; import java.util.Optional; public class SubscribeToTopicV2 { public static void main(String[] args) { String topic = args[0]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse, SubscribeToTopicResponseHandler> response = ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent, Optional.of(SubscribeToTopicV2::onStreamError), Optional.of(SubscribeToTopicV2::onStreamClosed)); SubscribeToTopicResponseHandler responseHandler = response.getHandler(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage(); String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8); String topic = binaryMessage.getContext().getTopic(); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } public static boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } public static void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } }
Python (IPC client V2)
contoh Contoh: Berlangganan pesan publikasi/langganan lokal
import sys import time import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( SubscriptionResponseMessage, UnauthorizedError ) def main(): args = sys.argv[1:] topic = args[0] try: ipc_client = GreengrassCoreIPCClientV2() # Subscription operations return a tuple with the response and the operation. _, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed) print('Successfully subscribed to topic: ' + topic) # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') # To stop subscribing, close the stream. operation.close() except UnauthorizedError: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) traceback.print_exc() exit(1) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def on_stream_event(event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, 'utf-8') topic = event.binary_message.context.topic print('Received new message on topic %s: %s' % (topic, message)) except: traceback.print_exc() def on_stream_error(error: Exception) -> bool: print('Received a stream error.', file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed() -> None: print('Subscribe to topic stream closed.') if __name__ == '__main__': main()
C++
contoh Contoh: Berlangganan pesan publikasi/langganan lokal
#include <iostream> #include </crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); // Handle JSON message. } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); // Handle binary message. } } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); int timeout = 10; SubscribeToTopicRequest request; request.SetTopic(topic); //SubscribeResponseHandler streamHandler; auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
JavaScript
contoh Contoh: Berlangganan pesan publikasi/langganan lokal
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc"; class SubscribeToTopic { private ipcClient : greengrasscoreipc.Client private readonly topic : string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.subscribeToTopic().then(r => console.log("Started workflow")); } private async subscribeToTopic() { try { this.ipcClient = await getIpcClient(); const subscribeToTopicRequest : SubscribeToTopicRequest = { topic: this.topic, } const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options streamingOperation.on("message", (message: SubscriptionResponseMessage) => { // parse the message depending on your use cases, e.g. if(message.binaryMessage && message.binaryMessage.message) { const receivedMessage = message.binaryMessage?.message.toString(); } }); streamingOperation.on("streamError", (error : RpcError) => { // define your own error handling logic }) streamingOperation.on("ended", () => { // define your own logic }) await streamingOperation.activate(); // Keep the main thread alive, or the process will exit. await new Promise((resolve) => setTimeout(resolve, 10000)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const subscribeToTopic = new SubscribeToTopic();

Praktik terbaik IPC

Praktik terbaik untuk menggunakan IPC dalam komponen khusus berbeda antara klien IPC V1 dan klien IPC V2. Ikuti praktik terbaik untuk versi klien IPC yang Anda gunakan.

IPC client V2

Klien IPC V2 menjalankan fungsi callback di thread terpisah, jadi dibandingkan dengan klien IPC V1, ada lebih sedikit pedoman yang harus Anda ikuti saat Anda menggunakan IPC dan menulis fungsi penangan langganan.

  • Gunakan kembali satu klien IPC

    Setelah Anda membuat klien IPC, tetap buka dan gunakan kembali untuk semua operasi IPC. Membuat beberapa klien menggunakan sumber daya tambahan dan dapat mengakibatkan kebocoran sumber daya.

  • Tangani pengecualian

    Klien IPC V2 mencatat pengecualian yang tidak tertangkap dalam fungsi penangan langganan. Anda harus menangkap pengecualian dalam fungsi handler Anda untuk menangani kesalahan yang terjadi dalam kode Anda.

IPC client V1

Klien IPC V1 menggunakan utas tunggal yang berkomunikasi dengan server IPC dan memanggil penangan langganan. Anda harus mempertimbangkan perilaku sinkron ini ketika Anda menuliskan fungsi penanganan langganan.

  • Gunakan kembali satu klien IPC

    Setelah Anda membuat klien IPC, tetap buka dan gunakan kembali untuk semua operasi IPC. Membuat beberapa klien menggunakan sumber daya tambahan dan dapat mengakibatkan kebocoran sumber daya.

  • Jalankan kode pemblokiran secara asinkron

    Klien IPC V1 tidak dapat mengirim permintaan baru atau memproses pesan acara baru saat utas diblokir. Anda harus menjalankan kode pemblokiran di thread terpisah yang Anda jalankan dari fungsi handler. Kode pemblokiran meliputi panggilan sleep, loop yang terus berjalan, dan permintaan I/O sinkron yang membutuhkan waktu untuk diselesaikan.

  • Kirim permintaan IPC baru secara asinkron

    Klien IPC V1 tidak dapat mengirim permintaan baru dari dalam fungsi handler langganan, karena permintaan memblokir fungsi handler jika Anda menunggu respons. Anda harus mengirim permintaan IPC di thread terpisah yang Anda jalankan dari fungsi handler.

  • Tangani pengecualian

    Klien IPC V1 tidak menangani pengecualian yang tidak tertangkap dalam fungsi penangan langganan. Jika fungsi penanganan Anda melempar pengecualian, langganan tersebut akan menutup, dan pengecualian tidak akan muncul dalam log komponen Anda. Anda harus menangkap pengecualian dalam fungsi handler Anda untuk menjaga langganan tetap terbuka dan mencatat kesalahan yang terjadi dalam kode Anda.