2023 年 6 月 30 日に AWS IoT Greengrass Version 1 は延長ライフサイクルフェーズに入りました。詳細については、「AWS IoT Greengrass V1 メンテナンスポリシー」を参照してください。この日付以降、AWS IoT Greengrass V1 の機能、拡張機能、バグ修正、またはセキュリティパッチを提供するアップデートはリリースされません。AWS IoT Greengrass V1 で稼働中のデバイスは中断されず、引き続き動作し、クラウドに接続できます。重要な新機能や新たなプラットフォームのサポートが追加された AWS IoT Greengrass Version 2 への移行を強くお勧めします。
AWS クラウド へのデータストリームエクスポート (コンソール)
このチュートリアルでは、AWS IoT コンソールを使用して、ストリームマネージャーを有効にし AWS IoT Greengrass グループを作成およびデプロイする方法について説明します。グループには、ストリームマネージャーのストリームに書き込むユーザー定義 Lambda 関数が含まれています。ストリームマネージャーは、自動的に AWS クラウド にエクスポートされます。
ストリームマネージャーにより、大量のデータストリームの取り込み、処理、エクスポートが効率化され、信頼性が向上します。このチュートリアルでは、IoT データを消費する TransferStream
Lambda 関数を作成します。Lambda 関数は、AWS IoT Greengrass Core SDK を使用して、ストリームマネージャーでストリームを作成し、ストリームでの読み取りと書き込みを行います。ストリームマネージャーは、ストリームを Kinesis Data Streams にエクスポートします。以下の図は、このワークフローを示しています。
このチュートリアルでは、ユーザー定義の Lambda 関数が AWS IoT Greengrass Core SDK 内の StreamManagerClient
オブジェクトを使用してストリームマネージャーとやり取りする方法について説明します。簡単にするために、このチュートリアルで作成する Python Lambda 関数は、シミュレートされたデバイスデータを生成します。
前提条件
このチュートリアルを完了するには、以下が必要です。
-
Greengrass グループと Greengrass コア (v1.10 以降)。Greengrass のグループまたはコアを作成する方法については、「AWS IoT Greengrass の開始方法」を参照してください。開始方法チュートリアルには、AWS IoT Greengrass Core ソフトウェアのインストール手順も含まれています。
注記
ストリームマネージャーは OpenWrt ディストリビューションではサポートされていません。
-
コアデバイスにインストールされている Java 8 ランタイム (JDK 8)。
-
Debian ベースのディストリビューション (Raspbian を含む) または Ubuntu ベースのディストリビューションの場合は、次のコマンドを実行します。
sudo apt install openjdk-8-jdk
-
Red Hat ベースのディストリビューション (Amazon Linux を含む) の場合は、次のコマンドを実行します。
sudo yum install java-1.8.0-openjdk
詳細については、OpenJDK ドキュメントの「How to download and install prebuilt OpenJDK packages
」を参照してください。
-
-
AWS IoT Greengrass Core SDK for Python v1.5.0 以降。AWS IoT Greengrass Core SDK for Python で
StreamManagerClient
を使用するには、以下を行う必要があります。-
Python 3.7 以降をコアデバイスにインストールします。
-
SDK とその依存関係を Lambda 関数デプロイパッケージに含めます。このチュートリアルでは、手順を説明しています。
ヒント
StreamManagerClient
を Java または NodeJS で使用できます。コードの例については、GitHub の 「AWS IoT Greengrass Core SDK for Java」と「AWS IoT Greengrass Core SDK for Node.js 」を参照してください。 -
-
Greengrass グループと同じ AWS リージョン の Amazon Kinesis Data Streams で作成された
MyKinesisStream
という名前の送信先ストリーム。詳細については、「Amazon Kinesis デベロッパーガイド」の「ストリームを作成する」を参照してください。注記
このチュートリアルでは、ストリームマネージャーが Kinesis Data Streams にデータをエクスポートするため、AWS アカウント に課金されます。料金の詳細については、「Amazon Kinesis Data Streams の料金
」を参照してください。 料金が発生しないようにするには、Kinesis データストリームを作成せずにこのチュートリアルを実行します。この場合、ログを確認して、ストリームマネージャーがストリームを Kinesis Data Streams にエクスポートしようとしたことを確認します。
-
次の例に示すように、ターゲットデータストリームで
kinesis:PutRecords
アクションを許可する Greengrass グループのロール に IAM ポリシーが追加されている。{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:
region
:account-id
:stream/MyKinesisStream" ] } ] }
このチュートリアルのおおまかな手順は以下のとおりです。
このチュートリアルは完了までに約 20 分かかります。
ステップ 1: Lambda 関数デプロイパッケージを作成する
この手順では、Python 関数コードと依存関係を含む Lambda 関数デプロイパッケージを作成します。このパッケージは、後で AWS Lambda で Lambda 関数を作成するときにアップロードします。Lambda 関数は AWS IoT Greengrass Core SDK を使用して、ローカルストリームを作成および操作します。
注記
ユーザー定義の Lambda 関数は、AWS IoT Greengrass Core SDK を使用してストリームマネージャーと対話する必要があります。Greengrass ストリームマネージャーの要件の詳細については、「Greengrass ストリームマネージャーの要件」を参照してください。
-
v1.5.0 以降の AWS IoT Greengrass Core SDK for Python をダウンロードします。
-
ダウンロードしたパッケージを解凍し、SDK を取得します。SDK は
greengrasssdk
フォルダです。 -
パッケージ依存関係をインストールして、Lambda 関数デプロイパッケージに SDK を含めます。
-
requirements.txt
ファイルのある SDK ディレクトリに移動します。このファイルには依存関係の一覧が記載されています。 -
SDK の依存関係をインストールします。例えば、次の
pip
コマンドを実行して、現在のディレクトリにインストールします。pip install --target . -r requirements.txt
-
-
以下の Python コード関数を
transfer_stream.py
というローカルファイルに保存します。ヒント
Java と NodeJS を使用するコードの例については、GitHub の「AWS IoT Greengrass Core SDK for Java
」と「AWS IoT Greengrass Core SDK for Node.js 」を参照してください。 import asyncio import logging import random import time from greengrasssdk.stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ReadMessagesOptions, ResourceNotFoundException, StrategyOnFull, StreamManagerClient, ) # This example creates a local stream named "SomeStream". # It starts writing data into that stream and then stream manager automatically exports # the data to a customer-created Kinesis data stream named "MyKinesisStream". # This example runs forever until the program is stopped. # The size of the local stream on disk will not exceed the default (which is 256 MB). # Any data appended after the stream reaches the size limit continues to be appended, and # stream manager deletes the oldest data until the total stream size is back under 256 MB. # The Kinesis data stream in the cloud has no such bound, so all the data from this script is # uploaded to Kinesis and you will be charged for that usage. def main(logger): try: stream_name = "SomeStream" kinesis_stream_name = "MyKinesisStream" # Create a client for the StreamManager client = StreamManagerClient() # Try deleting the stream (if it exists) so that we have a fresh start try: client.delete_message_stream(stream_name=stream_name) except ResourceNotFoundException: pass exports = ExportDefinition( kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)] ) client.create_message_stream( MessageStreamDefinition( name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports ) ) # Append two messages and print their sequence numbers logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")), ) logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")), ) # Try reading the two messages we just appended and print them out logger.info( "Successfully read 2 messages: %s", client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)), ) logger.info("Now going to start writing random integers between 0 and 1000 to the stream") # Now start putting in random data between 0 and 1000 to emulate device sensor input while True: logger.debug("Appending new random integer to stream") client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big")) time.sleep(1) except asyncio.TimeoutError: logger.exception("Timed out while executing") except Exception: logger.exception("Exception while running") def function_handler(event, context): return logging.basicConfig(level=logging.INFO) # Start up this sample code main(logger=logging.getLogger())
-
以下の項目を
transfer_stream_python.zip
という名前のファイルに圧縮します。これが Lambda 関数デプロイパッケージです。-
transfer_stream.py。アプリケーションロジック。
-
greengrasssdk。MQTT メッセージを発行する Python Greengrass Lambda 関数で必須のライブラリ。
ストリームマネージャーの操作は、AWS IoT Greengrass Core SDK for Python のバージョン 1.5.0 以降で使用できます。
-
AWS IoT Greengrass Core SDK for Python にインストールした依存関係 (例えば、
cbor2
ディレクトリ)。
zip
ファイルを作成するときは、これらの項目のみを含み、それらが含まれているフォルダは含みません。 -
ステップ 2: Lambda 関数を作成する
このステップでは、AWS Lambda コンソールを使用して Lambda 関数を作成し、デプロイパッケージを使用するようにその関数を設定します。次に、関数のバージョンを公開し、エイリアスを作成します。
-
最初に Lambda 関数を作成します。
-
AWS Management Consoleで、[サービス] を選択し、AWS Lambda コンソールを開きます。
-
[Create function] (関数の作成) を選択し、[Author from scratch] (一から作成) を選択します。
-
[基本的な情報] セクションで、以下の値を使用します。
-
[関数名] に
TransferStream
と入力します。 -
[ランタイム] で [Python 3.7] を選択します。
-
[アクセス許可] はデフォルト設定のままにしておきます。これで Lambda への基本的なアクセス許可を付与する実行ロールが作成されます。このロールは、AWS IoT Greengrass によっては使用されません。
-
-
ページの下部で、[関数の作成] を選択します。
-
-
今度は、ハンドラを登録し、Lambda 関数デプロイパッケージをアップロードします。
-
[Code] (コード) タブの [Code source] (コードソース) で、[Upload from] (アップロード元) を選択します。ドロップダウンから [.zip ファイル] を選択します。
-
[Upload] (アップロード) を選択し、
transfer_stream_python.zip
デプロイパッケージを選択します。次に、[保存] を選択します。 -
関数の [Code] (コード) タブにある [Runtime settings] (ランタイム設定) で [Edit] (編集) を選択し、次の値を入力します。
-
[ランタイム] で [Python 3.7] を選択します。
-
[ハンドラ] に
transfer_stream.function_handler
と入力します。
-
-
[保存] を選択します。
注記
AWS Lambda コンソールの [Test] (テスト) ボタンは、この関数では機能しません。AWS IoT Greengrass Core SDK には、AWS Lambda コンソールで Greengrass Lambda 関数を個別に実行するために必要なモジュールは含まれていません。これらのモジュール (例えば
greengrass_common
) が関数に提供されるのは、Greengrass Core にデプロイされた後になります。
-
-
ここで、Lambda 関数の最初のバージョンを公開し、バージョンのエイリアスを作成します。
注記
Greengrass グループは、Lambda 関数をエイリアス別 (推奨) またはバージョン別に参照できます。エイリアスを使用すると、関数コードを更新する時にサブスクリプションテーブルやグループ定義を変更する必要がないため、コード更新を簡単に管理できます。その代わりに、新しい関数バージョンにエイリアスを指定するだけで済みます。
-
[TransferStream: 1] 設定ページで、[アクション] メニューの [エイリアスを作成] を選択します。
-
[新しいエイリアスの作成] ページで、次の値を使用します。
-
[名前] に
GG_TransferStream
と入力します。 -
[バージョン] で、[1] を選択します。
注記
AWS IoT Greengrass は、$LATEST バージョンの Lambda エイリアスをサポートしていません。
-
-
[作成] を選択します。
-
これで、Greengrass グループに Lambda 関数を追加する準備ができました。
ステップ 3: Greengrass グループに Lambda 関数を追加する
このステップでは、Lambda 関数をグループに追加し、そのライフサイクルと環境変数を設定します。詳細については、「グループ固有の設定による Greengrass Lambda 関数の実行の制御」を参照してください。
AWS IoTコンソールのナビゲーションペインの [Manage] (管理) で、 [Greengrass devices] (Greengrass デバイス) を展開して、[Groups (V1)] (グループ 〔V1〕) を選択します。
ターゲットグループを選択します。
-
グループ設定ページで、[Lambda functions] (Lambda 関数) タブを選択します。
-
[My Lambda functions] (自分の Lambda 関数) で、[Add] (追加)を選択します。
-
[Lambda 関数の追加] ページで、Lambda 関数用の [Lambda 関数] を選択します。
-
Lambda バージョンでは、Alias:GG_TransferStream を選択します。
ここで、Greengrass グループの Lambda 関数の動作を決定するプロパティを設定します。
-
[Lambda function configuration] (Lambda 関数の設定) セクションで、次のように変更します。
-
[メモリ制限] を 32 MB に設定します。
-
[Pinned] (固定)で、[True] を選択します。
注記
存続期間の長い (または固定された) Lambda 関数は、AWS IoT Greengrass の起動後に自動的に開始され、独自のコンテナで実行され続けます。これはオンデマンド Lambda 関数とは対照的です。この関数は呼び出されたときに開始し、実行するタスクが残っていないときに停止します。詳細については、「Greengrass Lambda 関数のライフサイクル設定」を参照してください。
-
-
[Add Lambda function] (Lambda 関数の追加) を選択します。
ステップ 4: ストリームマネージャーを有効にする
この手順では、ストリームマネージャーが有効になっていることを確認します。
-
グループ設定ページで、[Lambda functions] (Lambda 関数) タブを選択します。
-
[System Lambda functions] (システム Lambda 関数) で、 [Stream manager] (ストリームマネージャー) を選択し、ステータスを確認します。無効になっている場合は、[編集] を選択します。次に、[有効化] を選択し、[保存] を選択します。このチュートリアルでは、デフォルトのパラメータ設定を使用できます。詳細については、「AWS IoT Greengrass ストリームマネージャーの設定」を参照してください。
注記
コンソールを使用してストリームマネージャーを有効にし、グループをデプロイすると、ストリームマネージャーのメモリサイズはデフォルトで 4194304 KB (4 GB) に設定されます。メモリのサイズは 128000 KB 以上に設定することをお勧めします。
ステップ 5: ローカルログを設定する
このステップでは、コアデバイスのファイルシステムにログを書き込むように、グループ内の AWS IoT Greengrass システムコンポーネント、ユーザー定義 Lambda 関数、コネクタを設定します。ログを使用して、発生する可能性のある問題のトラブルシューティングを行うことができます。詳細については、「AWS IoT Greengrass ログでのモニタリング」を参照してください。
ステップ 6: Greengrass グループをデプロイする
Core デバイスにグループをデプロイします。
ステップ 7: アプリケーションをテストする
この TransferStream
Lambda 関数は、シミュレートされたデバイスデータを生成します。ストリームマネージャーがターゲットの Kinesis データストリームにエクスポートするストリームにデータを書き込みます。
-
Amazon Kinesis コンソールの [Kinesis data streams] (Kinesis データストリーム) で、[MyKinesisStream] を選択します。
注記
ターゲットの Kinesis データストリームを使用せずにチュートリアルを実行した場合は、ストリームマネージャーのログファイルを確認します (
GGStreamManager
)。エラーメッセージにexport stream MyKinesisStream doesn't exist
が含まれている場合、テストは成功します。このエラーは、サービスがストリームにエクスポートしようとしましたが、ストリームが存在しないことを意味します。 -
[MyKinesisStream] ページで、[Monitoring (モニタリング)] を選択します。テストが成功すると、Put Records (レコードの配置) グラフにデータが表示されます。接続によっては、データが表示されるまでに 1 分かかることがあります。
重要
テストが終了したら、Kinesis データストリームを削除して、それ以上の料金が発生しないようにします。
または、次のコマンドを実行して Greengrass デーモンを停止します。これにより、テストを続行する準備が整うまで、コアがメッセージを送信できなくなります。
cd /greengrass/ggc/core/ sudo ./greengrassd stop
-
コアから TransferStream Lambda 関数を削除します。
AWS IoT コンソールのナビゲーションペインの [Manage] (管理) で、[Greengrass devices] (Greengrass デバイス) を展開して、[Groups (V1)] (グループ (V1))を選択します。
-
[Greengrass グループ] で、目的のグループを選択します。
-
[Lambdas] ページで、[TransferStream] 関数の省略記号 (…)、[Remove function] (関数の削除) の順に選択します。
-
[アクション] で、[デプロイ] を選択します。
ロギング情報を表示したり、ストリームに関する問題をトラブルシューティングしたりするには、TransferStream
および GGStreamManager
関数のログを確認します。ファイルシステム上の AWS IoT Greengrass ログを読み取るには、root
権限が必要です。
-
TransferStream
は、ログエントリを
に書き込みます。greengrass-root
/ggc/var/log/user/region
/account-id
/TransferStream.log -
GGStreamManager
は、ログエントリを
に書き込みます。greengrass-root
/ggc/var/log/system/GGStreamManager.log
トラブルシューティングの詳細が必要な場合は、[User Lambda logs] (ユーザー Lambda ログ) のログレベルを [Debug logs] (デバッグログ) に設定してから、グループを再度デプロイできます。