AWS IoT Device SDK を使用して Greengrass nucleus、その他のコンポーネント、および AWS IoT Core と通信する
コアデバイスで実行されているコンポーネントは、AWS IoT Greengrass Core プロセス間通信 (IPC) ライブラリを AWS IoT Device SDK で使用して、AWS IoT Greengrass nucleus やその他の Greengrass コンポーネントと通信できます。IPC を使用するカスタムコンポーネントを開発して実行するには、AWS IoT Device SDK を使用して AWS IoT Greengrass Core IPC サービスに接続し、IPC オペレーションを実行する必要があります。
IPC インターフェイスは、以下の 2 種類のオペレーションをサポートします。
-
リクエスト/レスポンス
コンポーネントは IPC サービスにリクエストを送信し、リクエストの結果を含むレスポンスを受け取ります。
-
サブスクリプション
コンポーネントはサブスクリプション要求を IPC サービスに送信し、イベントメッセージのストリームをレスポンスとして期待します。コンポーネントは、イベントメッセージ、エラー、およびストリームクロージャを処理するサブスクリプションハンドラーを提供します。AWS IoT Device SDK には、各 IPC オペレーションの正しいレスポンスとイベントタイプを持つハンドラーインターフェイスが含まれます。詳細については、「IPC イベントストリームへのサブスクライブ」を参照してください。
IPC クライアントのバージョン
それ以降の Java および Python SDK のバージョンでは、IPC クライアント V2 と呼ばれる IPC クライアントの改良版を、AWS IoT Greengrass が提供します。IPC クライアント V2
-
IPC オペレーションの使用に必要なコードの記述を削減し、IPC クライアント V1 で発生する一般的なエラーの回避に役立ちます。
-
サブスクリプションハンドラーコールバックを別のスレッドで呼び出すため、IPC 関数呼び出しの追加呼び出しを含めたブロッキングコードを、サブスクリプションハンドラーコールバック内で実行できるようになりました。IPC クライアント V1 は同じスレッドを使用して、IPC サーバーとの通信とサブスクリプションハンドラーの呼び出しを行います。
-
Lambda 式 (Java の場合) または関数 (Python の場合) を使用して、サブスクリプションオペレーションを呼び出せます。IPC クライアント V1 では、サブスクリプションハンドラクラスの定義が必要です。
-
同期バージョンと非同期バージョンの IPC オペレーションを提供します。IPC クライアント V1 では、非同期バージョンのオペレーションのみ提供します。
これらの機能改善を利用するには、IPC クライアント V2 の使用をお勧めします。ただし、このドキュメントやオンラインコンテンツに掲載されている例の多くは、IPC クライアント V1 の使用方法のみ紹介しています。次の例とチュートリアルから、IPC クライアント V2 を使用するサンプルのコンポーネントを確認できます。
現在 AWS IoT Device SDK for C++ v2 では、IPC クライアント V1のみをサポートしています。
プロセス間通信でサポートされている SDK
AWS IoT Greengrass Core IPC ライブラリは以下の AWS IoT Device SDK バージョンに含まれています。
AWS IoT Greengrass Core IPC サービスに接続する
カスタムコンポーネントでプロセス間通信を使用するには、AWS IoT Greengrass Core ソフトウェアが稼働する IPC サーバーソケットへの接続を作成する必要があります。AWS IoT Device SDK を任意の言語でダウンロードして使用するには、次のタスクを実行してください。
AWS IoT Device SDK for Java v2 (IPC クライアント V2) を使用するには
-
AWS IoT Device SDK for Java v2 (v1.6.0 以降) をダウンロードします。
-
以下のいずれかを行って、コンポーネントでカスタムコードを実行します。
-
以下のコードを使用して IPC クライアントを作成します。
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
// Use client.
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e);
System.exit(1);
}
AWS IoT Device SDK for Python v2 (IPC クライアント V2) を使用するには
-
AWS IoT Device SDK for Python (v1.9.0 以降) をダウンロードします。
-
SDK のインストール手順を、コンポーネントの recipe のインストールライフサイクルに追加します。
-
AWS IoT Greengrass Core IPC サービスへの接続を作成します。以下のコードを使用して IPC クライアントを作成します。
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
try:
ipc_client = GreengrassCoreIPCClientV2()
# Use IPC client.
except Exception:
print('Exception occurred when using IPC.', file=sys.stderr)
traceback.print_exc()
exit(1)
AWS IoT Device SDK v2 for C++ をビルドするには、デバイスに以下のツールが必要です。
-
C++ 11 以降
-
CMake 3.1 以降
-
以下のいずれかのコンパイラ:
-
GCC 4.8 以降
-
Clang 3.9 以降
-
MSVC 2015 以降
AWS IoT Device SDK for C++ v2 を使用するには
-
AWS IoT Device SDK for C++ v2 (v1.17.0 以降) をダウンロードします。
-
README のインストール手順に従って、AWS IoT Device SDK for C++ v2 をソースからビルドします。
-
C++ ビルドツールで、前の手順でビルドした Greengrass IPC ライブラリ (AWS::GreengrassIpc-cpp
) をリンクします。以下の CMakeLists.txt
の例は、Greengrass IPC ライブラリを CMake でビルドしたプロジェクトにリンクします。
cmake_minimum_required(VERSION 3.1)
project (greengrassv2_pubsub_subscriber)
file(GLOB MAIN_SRC
"*.h"
"*.cpp"
)
add_executable(${PROJECT_NAME} ${MAIN_SRC})
set_target_properties(${PROJECT_NAME} PROPERTIES
LINKER_LANGUAGE CXX
CXX_STANDARD 11)
find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build)
find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build)
find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build)
target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
-
コンポーネントコードで AWS IoT Greengrass Core IPC サービスへの接続を作成して、IPC クライアント (Aws::Greengrass::GreengrassCoreIpcClient
) を作成します。IPC の接続、切断、およびエラーイベントを処理する IPC 接続ライフサイクルハンドラーを定義する必要があります。次の例は、IPC クライアントの接続、切断、およびエラーの発生時に出力される IPC クライアントおよび IPC 接続ライフサイクルハンドラーを作成します。
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
// Create the IPC client.
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
// Use the IPC client to create an operation request.
// Activate the operation request.
auto activate = operation.Activate(request, nullptr);
activate.wait();
// Wait for Greengrass Core to respond to the request.
auto responseFuture = operation.GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
// Check the result of the request.
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully published to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to publish to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
return 0;
}
-
コンポーネントでカスタムコードを実行するには、コードをバイナリアーティファクトとしてビルドし、コンポーネント recipe でバイナリアーティファクトを実行します。OWNER
へのアーティファクトの Execute
権限を設定して、AWS IoT Greengrass Core ソフトウェアがバイナリアーティファクトを実行できるようにします。
コンポーネント recipe の Manifests
セクションは、次の例のようになります。
- JSON
-
{
...
"Manifests": [
{
"Lifecycle": {
"run": "{artifacts:path}/greengrassv2_pubsub_subscriber"
},
"Artifacts": [
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber",
"Permission": {
"Execute": "OWNER"
}
}
]
}
]
}
- YAML
-
...
Manifests:
- Lifecycle:
run: {artifacts:path}/greengrassv2_pubsub_subscriber
Artifacts:
- URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber
Permission:
Execute: OWNER
NodeJS と使用できるように AWS IoT Device SDK for JavaScript v2 をビルドするには、デバイスには次のツールが必要です。
-
NodeJS 10.0 以降
-
CMake 3.1 以降
AWS IoT Device SDK for JavaScript v2 (IPC クライアント V1) を使用するには
-
AWS IoT Device SDK for JavaScript v2 (v1.12.10 以降) をダウンロードします。
-
README のインストール手順に従って、AWS IoT Device SDK for JavaScript v2 をソースからビルドします。
-
AWS IoT Greengrass Core IPC サービスへの接続を作成します。次の手順を実行して、IPC クライアントを作成し、接続を確立します。
-
以下のコードを使用して IPC クライアントを作成します。
import * as greengrascoreipc from 'aws-iot-device-sdk-v2';
let client = greengrascoreipc.createClient();
-
次のコードを使用して、コンポーネントから Greengrass nucleus への接続を確立します。
await client.connect();
コンポーネントに IPC オペレーションの実行を許可する
カスタムコンポーネントが IPC オペレーションの一部を使用できるようにするには、コンポーネントが特定のリソースに対しオペレーションを実行できるようにする承認ポリシーを定義する必要があります。各承認ポリシーは、ポリシーが許可するオペレーションのリストとリソースのリストを定義します。たとえば、パブリッシュ/サブスクライブメッセージング IPC サービスは、トピックリソースの発行およびサブスクライブのオペレーションを定義します。*
ワイルドカードを使用すると、すべてのオペレーションまたはすべてのリソースへのアクセスを許可できます。
承認ポリシーは accessControl
設定パラメータで定義しますが、これはコンポーネント recipe で、またはコンポーネントをデプロイするときに設定できます。accessControl
オブジェクトは、IPC サービス識別子を承認ポリシーのリストにマッピングします。各 IPC サービスに対しては、複数の承認ポリシーを定義してアクセスを制御できます。各承認ポリシーにはポリシー ID があり、すべてのコンポーネントで一意である必要があります。
一意のポリシー ID を作成するには、コンポーネント名、IPC サービス名、およびカウンターを組み合わせることができます。たとえば com.example.HelloWorld
という名前のコンポーネントには、以下の ID を持つ 2 つのパブリッシュ/サブスクライブ承認ポリシーを定義できます。
承認ポリシーは以下の形式を使用します。このオブジェクトは accessControl
設定パラメータです。
- JSON
-
{
"IPC service identifier
": {
"policyId
": {
"policyDescription": "description
",
"operations": [
"operation1
",
"operation2
"
],
"resources": [
"resource1
",
"resource2
"
]
}
}
}
- YAML
-
IPC service identifier
:
policyId
:
policyDescription: description
operations:
- operation1
- operation2
resources:
- resource1
- resource2
承認ポリシー内のワイルドカード
IPC 承認ポリシーの resources
エレメントで *
ワイルドカードを使うと、1 つの承認ポリシーにある複数のリソースにアクセスできます。
-
Greengrass nucleus の全てのバージョンにおいて、リソースとして *
の文字を指定すると、全てのリソースにアクセスできます。
-
Greengrass nucleus の v2.6.0 以降では、リソースで *
の文字を指定すると、どの文字列の組み合わせにも一致します。例えば factory/1/devices/Thermostat*/status
と指定すると、工場内のサーモスタットデバイスのうち、名前が Thermostat
で始まるすべてのデバイスのステータストピックへアクセスできます。
AWS IoT Core MQTT IPC サービスの承認ポリシーを定義する場合、MQTT ワイルドカード (+
および #
) を使用すると、複数のリソースに一致します。詳細については、「 AWS IoT Core MQTT IPC 承認ポリシーにおける MQTT ワイルドカード」を参照してください。
承認ポリシーの recipe 変数
Greengrass nucleus の v2.6.0 以降を使用していて、Greengrass nucleus の interpolateComponentConfiguration 設定オプションを true
に設定している場合、承認ポリシーで {iot:thingName}
recipe 変数を使用できます。MQTT トピックやデバイスシャドウなど、コアデバイスの名前を含む承認ポリシーが必要な場合、この recipe 変数を使用すると、複数のコアデバイスからなるグループに対して、1 つの承認ポリシーを設定できます。例えば、シャドウ IPC オペレーションのために、コンポーネントに次のリソースへのアクセスを許可することができます。
$aws/things/{iot:thingName}/shadow/
承認ポリシーの特殊文字
承認ポリシーで *
または ?
をリテラル文字として指定するには、エスケープシーケンスを使う必要があります。次のエスケープシーケンスは、その文字が持つ特別な意味の代わりに、文字のリテラル値を使うように AWS IoT Greengrass Core ソフトウェアに指示します。例えば、 *
の文字は任意の文字の組み合わせに一致するワイルドカードです。
リテラル文字 |
エスケープシーケンス |
メモ |
*
|
${*}
|
|
?
|
${?}
|
現在 AWS IoT Greengrass は、任意の 1 文字に一致する ? ワイルドカードをサポートしていません。
|
$
|
${$}
|
このエスケープシーケンスを使用すると、${ を含むリソースに一致します。例えば、${resourceName} という名前のリソースに一致させるには、${$}{resourceName} と指定する必要があります。もしくは、$aws で始まるトピックへアクセスできるように、リテラルの $ を使って $ を含むリソースに一致させます。
|
承認ポリシーの例
次の承認ポリシーの例を参照して、コンポーネントの承認ポリシー設定の参考にできます。
例 承認ポリシーを使用したコンポーネント recipe の例
次のコンポーネント recipe の例には、承認ポリシーを定義する accessControl
オブジェクトが含まれます。このポリシーは com.example.HelloWorld
コンポーネントを承認して test/topic
トピックに発行します。
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.HelloWorld",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that publishes messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.HelloWorld:pubsub:1": {
"policyDescription": "Allows access to publish to test/topic.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"test/topic"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"run": "java -jar {artifacts:path}/HelloWorld.jar"
}
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.HelloWorld
ComponentVersion: '1.0.0'
ComponentDescription: A component that publishes messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
"com.example.HelloWorld:pubsub:1":
policyDescription: Allows access to publish to test/topic.
operations:
- "aws.greengrass#PublishToTopic"
resources:
- "test/topic"
Manifests:
- Lifecycle:
run: |-
java -jar {artifacts:path}/HelloWorld.jar
例 承認ポリシーを使用したコンポーネント設定更新の例
次のデプロイの設定更新の例では、承認ポリシーを定義する accessControl
オブジェクトでコンポーネントを設定することが指定されています。このポリシーは com.example.HelloWorld
コンポーネントを承認して test/topic
トピックに発行します。
- Console
-
- マージする設定
-
{
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.HelloWorld:pubsub:1": {
"policyDescription": "Allows access to publish to test/topic.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"test/topic"
]
}
}
}
}
- AWS CLI
-
次のコマンドは、コアデバイスにデプロイを作成します。
aws greengrassv2 create-deployment --cli-input-json file://hello-world-deployment.json
hello-world-deployment.json
ファイルには、次の JSON ドキュメントが含まれています。
{
"targetArn": "arn:aws:iot:us-west-2:123456789012:thing/MyGreengrassCore",
"deploymentName": "Deployment for MyGreengrassCore",
"components": {
"com.example.HelloWorld": {
"componentVersion": "1.0.0",
"configurationUpdate": {
"merge": "{\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"com.example.HelloWorld:pubsub:1\":{\"policyDescription\":\"Allows access to publish to test/topic.\",\"operations\":[\"aws.greengrass#PublishToTopic\"],\"resources\":[\"test/topic\"]}}}}"
}
}
}
}
- Greengrass CLI
-
次の Greengrass CLI コマンドは、コアデバイスにローカルデプロイを作成します。
sudo greengrass-cli deployment create \
--recipeDir recipes \
--artifactDir artifacts \
--merge "com.example.HelloWorld=1.0.0" \
--update-config hello-world-configuration.json
hello-world-configuration.json
ファイルには、次の JSON ドキュメントが含まれています。
{
"com.example.HelloWorld": {
"MERGE": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.HelloWorld:pubsub:1": {
"policyDescription": "Allows access to publish to test/topic.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"test/topic"
]
}
}
}
}
}
}
IPC イベントストリームへのサブスクライブ
IPC オペレーションを使用して、Greengrass コアデバイスのイベントのストリームにサブスクライブできます。サブスクライブオペレーションを使用するには、サブスクリプションハンドラーを定義して、IPC サービスへのリクエストを作成します。これで IPC クライアントは、コアデバイスがコンポーネントにイベントメッセージをストリーミングするたびに、サブスクリプションハンドラーの関数が実行するようになります。
サブスクリプションを閉じると、イベントメッセージの処理を停止できます。これを行うには、サブスクリプションを開くときに使用したサブスクリプションオペレーションオブジェクトで、closeStream()
(Java)、close()
(Python)、または Close()
(C++) を呼び出します。
AWS IoT Greengrass Core IPC サービスは、以下のサブスクライブオペレーションをサポートします。
サブスクリプションハンドラーの定義
サブスクリプションハンドラーを定義するには、イベントメッセージ、エラー、およびストリームクロージャを処理するコールバック関数を定義します。IPC クライアント V1 を使用する場合は、クラス内でこれらの関数を定義する必要があります。最近の Java または Python SDK で利用可能な IPC クライアント V2 を使用している場合、サブスクリプションハンドラークラスを作成しなくても、これらの関数を定義できます。
- Java
-
IPC クライアント V1 を使用している場合は、一般的な software.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType
>
インターフェイスを実装する必要があります。StreamEventType
は、サブスクリプションオペレーションのイベントメッセージのタイプです。次の関数を定義して、イベントメッセージ、エラー、およびストリームクロージャを処理します。
IPC クライアント V2 を使用している場合は、サブスクリプションハンドラクラスの外部でこれらの関数を定義するか、Lambda 式を使います。
void onStreamEvent(StreamEventType
event)
-
IPC クライアントが MQTT メッセージやコンポーネント更新通知などのイベントメッセージを受信したときに呼び出すコールバック。
boolean onStreamError(Throwable error)
-
ストリームエラーが発生したときに IPC クライアントが呼び出すコールバック。
エラーの結果としてサブスクリプションストリームを閉じるには true を返し、ストリームを開いたままにするには false を返します。
void onStreamClosed()
-
ストリームが閉じたときに IPC クライアントが呼び出すコールバック。
- Python
-
IPC クライアント V1 を使用している場合、サブスクリプションオペレーションに対応するストリームレスポンスハンドラークラスを拡張する必要があります。AWS IoT Device SDK には、サブスクリプションオペレーションごとにサブスクリプションハンドラークラスが含まれています。StreamEventType
は、サブスクリプションオペレーションのイベントメッセージのタイプです。次の関数を定義して、イベントメッセージ、エラー、およびストリームクロージャを処理します。
IPC クライアント V2 を使用している場合は、サブスクリプションハンドラクラスの外部でこれらの関数を定義するか、Lambda 式を使います。
def on_stream_event(self, event:
StreamEventType
) -> None
-
IPC クライアントが MQTT メッセージやコンポーネント更新通知などのイベントメッセージを受信したときに呼び出すコールバック。
def on_stream_error(self, error: Exception) -> bool
-
ストリームエラーが発生したときに IPC クライアントが呼び出すコールバック。
エラーの結果としてサブスクリプションストリームを閉じるには true を返し、ストリームを開いたままにするには false を返します。
def on_stream_closed(self) -> None
-
ストリームが閉じたときに IPC クライアントが呼び出すコールバック。
- C++
-
サブスクリプションオペレーションに対応するストリームレスポンスハンドラークラスから派生したクラスを実装します。AWS IoT Device SDK には、サブスクリプションオペレーションごとにサブスクリプションハンドラーベースクラスが含まれています。StreamEventType
は、サブスクリプションオペレーションのイベントメッセージのタイプです。次の関数を定義して、イベントメッセージ、エラー、およびストリームクロージャを処理します。
void OnStreamEvent(StreamEventType
*event)
-
IPC クライアントが MQTT メッセージやコンポーネント更新通知などのイベントメッセージを受信したときに呼び出すコールバック。
bool OnStreamError(OperationError *error)
-
ストリームエラーが発生したときに IPC クライアントが呼び出すコールバック。
エラーの結果としてサブスクリプションストリームを閉じるには true を返し、ストリームを開いたままにするには false を返します。
void OnStreamClosed()
-
ストリームが閉じたときに IPC クライアントが呼び出すコールバック。
- JavaScript
-
サブスクリプションオペレーションに対応するストリームレスポンスハンドラークラスから派生したクラスを実装します。AWS IoT Device SDK には、サブスクリプションオペレーションごとにサブスクリプションハンドラーベースクラスが含まれています。StreamEventType
は、サブスクリプションオペレーションのイベントメッセージのタイプです。次の関数を定義して、イベントメッセージ、エラー、およびストリームクロージャを処理します。
on(event: 'ended', listener: StreamingOperationEndedListener)
-
ストリームが閉じたときに IPC クライアントが呼び出すコールバック。
on(event: 'streamError', listener: StreamingRpcErrorListener)
-
ストリームエラーが発生したときに IPC クライアントが呼び出すコールバック。
エラーの結果としてサブスクリプションストリームを閉じるには true を返し、ストリームを開いたままにするには false を返します。
on(event: 'message', listener: (message: InboundMessageType) => void)
-
IPC クライアントが MQTT メッセージやコンポーネント更新通知などのイベントメッセージを受信したときに呼び出すコールバック。
サブスクリプションハンドラーの例
次の例は、SubscribeToTopic オペレーションと、ローカルのパブリッシュ/サブスクライブメッセージにサブスクライブするためのサブスクリプションハンドラーの使用方法を示します。
- Java (IPC client V2)
-
例: ローカルのパブリッシュ/サブスクライブメッセージをサブスクライブする
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.*;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
public class SubscribeToTopicV2 {
public static void main(String[] args) {
String topic = args[0];
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic);
GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse,
SubscribeToTopicResponseHandler> response =
ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent,
Optional.of(SubscribeToTopicV2::onStreamError),
Optional.of(SubscribeToTopicV2::onStreamClosed));
SubscribeToTopicResponseHandler responseHandler = response.getHandler();
System.out.println("Successfully subscribed to topic: " + topic);
// Keep the main thread alive, or the process will exit.
try {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
}
// To stop subscribing, close the stream.
responseHandler.closeStream();
} catch (Exception e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Exception occurred when using IPC.");
}
e.printStackTrace();
System.exit(1);
}
}
public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) {
try {
BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage();
String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8);
String topic = binaryMessage.getContext().getTopic();
System.out.printf("Received new message on topic %s: %s%n", topic, message);
} catch (Exception e) {
System.err.println("Exception occurred while processing subscription response " +
"message.");
e.printStackTrace();
}
}
public static boolean onStreamError(Throwable error) {
System.err.println("Received a stream error.");
error.printStackTrace();
return false; // Return true to close stream, false to keep stream open.
}
public static void onStreamClosed() {
System.out.println("Subscribe to topic stream closed.");
}
}
- Python (IPC client V2)
-
例: ローカルのパブリッシュ/サブスクライブメッセージをサブスクライブする
import sys
import time
import traceback
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import (
SubscriptionResponseMessage,
UnauthorizedError
)
def main():
args = sys.argv[1:]
topic = args[0]
try:
ipc_client = GreengrassCoreIPCClientV2()
# Subscription operations return a tuple with the response and the operation.
_, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event,
on_stream_error=on_stream_error, on_stream_closed=on_stream_closed)
print('Successfully subscribed to topic: ' + topic)
# Keep the main thread alive, or the process will exit.
try:
while True:
time.sleep(10)
except InterruptedError:
print('Subscribe interrupted.')
# To stop subscribing, close the stream.
operation.close()
except UnauthorizedError:
print('Unauthorized error while subscribing to topic: ' +
topic, file=sys.stderr)
traceback.print_exc()
exit(1)
except Exception:
print('Exception occurred', file=sys.stderr)
traceback.print_exc()
exit(1)
def on_stream_event(event: SubscriptionResponseMessage) -> None:
try:
message = str(event.binary_message.message, 'utf-8')
topic = event.binary_message.context.topic
print('Received new message on topic %s: %s' % (topic, message))
except:
traceback.print_exc()
def on_stream_error(error: Exception) -> bool:
print('Received a stream error.', file=sys.stderr)
traceback.print_exc()
return False # Return True to close stream, False to keep stream open.
def on_stream_closed() -> None:
print('Subscribe to topic stream closed.')
if __name__ == '__main__':
main()
- C++
-
例: ローカルのパブリッシュ/サブスクライブメッセージをサブスクライブする
#include <iostream>
#include </crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class SubscribeResponseHandler : public SubscribeToTopicStreamHandler {
public:
virtual ~SubscribeResponseHandler() {}
private:
void OnStreamEvent(SubscriptionResponseMessage *response) override {
auto jsonMessage = response->GetJsonMessage();
if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) {
auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable();
// Handle JSON message.
} else {
auto binaryMessage = response->GetBinaryMessage();
if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) {
auto messageBytes = binaryMessage.value().GetMessage().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
// Handle binary message.
}
}
}
bool OnStreamError(OperationError *error) override {
// Handle error.
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
// Handle close.
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
// Handle connection to IPC service.
}
void OnDisconnectCallback(RpcError error) override {
// Handle disconnection from IPC service.
}
bool OnErrorCallback(RpcError error) override {
// Handle IPC service connection error.
return true;
}
};
int main() {
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
String topic("my/topic");
int timeout = 10;
SubscribeToTopicRequest request;
request.SetTopic(topic);
//SubscribeResponseHandler streamHandler;
auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToTopic(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (!response) {
// Handle error.
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
(void)error;
// Handle operation error.
} else {
// Handle RPC error.
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}
- JavaScript
-
例: ローカルのパブリッシュ/サブスクライブメッセージをサブスクライブする
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc";
import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model";
import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc";
class SubscribeToTopic {
private ipcClient : greengrasscoreipc.Client
private readonly topic : string;
constructor() {
// define your own constructor, e.g.
this.topic = "<define_your_topic>";
this.subscribeToTopic().then(r => console.log("Started workflow"));
}
private async subscribeToTopic() {
try {
this.ipcClient = await getIpcClient();
const subscribeToTopicRequest : SubscribeToTopicRequest = {
topic: this.topic,
}
const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options
streamingOperation.on("message", (message: SubscriptionResponseMessage) => {
// parse the message depending on your use cases, e.g.
if(message.binaryMessage && message.binaryMessage.message) {
const receivedMessage = message.binaryMessage?.message.toString();
}
});
streamingOperation.on("streamError", (error : RpcError) => {
// define your own error handling logic
})
streamingOperation.on("ended", () => {
// define your own logic
})
await streamingOperation.activate();
// Keep the main thread alive, or the process will exit.
await new Promise((resolve) => setTimeout(resolve, 10000))
} catch (e) {
// parse the error depending on your use cases
throw e
}
}
}
export async function getIpcClient(){
try {
const ipcClient = greengrasscoreipc.createClient();
await ipcClient.connect()
.catch(error => {
// parse the error depending on your use cases
throw error;
});
return ipcClient
} catch (err) {
// parse the error depending on your use cases
throw err
}
}
// starting point
const subscribeToTopic = new SubscribeToTopic();
IPC ベストプラクティス
カスタムコンポーネントで IPC を使用する場合のベストプラクティスは、IPC クライアント V1 と IPC クライアント V2 とで異なります。使用する IPC クライアントのバージョンのベストプラクティスに従ってください。
- IPC client V2
-
IPC クライアント V2 は、別のスレッドでコールバック関数を実行するため、IPC クライアント V1 と比較すると、IPC を使用する場合や、サブスクリプションハンドラ関数を記述する場合に、従うべきガイドラインが少なくなります。
-
1 つの IPC クライアントを再利用する
IPC クライアントを作成したら、そのクライアントを開いたままにして、全ての IPC オペレーションに再利用します。複数のクライアントを作成すると、リソースが余分に消費され、リソースリークが発生する可能性があります。
-
例外を処理する
IPC クライアント V2 は、サブスクリプションハンドラー関数でキャッチされない例外をログに記録します。コードで発生したエラーを処理するには、ハンドラー関数で例外をキャッチする必要があります。
- IPC client V1
-
IPC クライアント V1 は単一のスレッドを使用し、IPC サーバーとの通信とサブスクリプションハンドラーの呼び出しを行います。サブスクリプションハンドラー関数を記述するときは、この同期動作を考慮する必要があります。
-
1 つの IPC クライアントを再利用する
IPC クライアントを作成したら、そのクライアントを開いたままにして、全ての IPC オペレーションに再利用します。複数のクライアントを作成すると、リソースが余分に消費され、リソースリークが発生する可能性があります。
-
ブロッキングコードを非同期で実行する
IPC クライアント V1 は、スレッドがブロックされている間は、新しいリクエストの送信や新しいイベントメッセージの処理を行うことはできません。ブロッキングコードは、ハンドラー関数から実行する別個のスレッドで実行するべきです。ブロックコードには sleep
呼び出し、連続して実行されるループ、および完了までに時間がかかる同期 I/O リクエストなどがあります。
-
新しい IPC 要求を非同期で送信する
IPC クライアント V1 はサブスクリプションハンドラー関数内から新しいリクエストを送信できません。これは、応答を待機するとリクエストによってハンドラー関数がブロックされるためです。IPC リクエストの送信は、ハンドラー関数から実行する別個のスレッドで行うべきです。
-
例外を処理する
IPC クライアント V1 は、キャッチされない例外はサブスクリプションハンドラー関数で処理しません。ハンドラー関数が例外をスローすると、サブスクリプションが閉じて、例外はコンポーネントログに表示されません。ハンドラー関数で例外をキャッチして、サブスクリプションを開いたままにし、コードで発生したエラーをログに記録してください。