AWS クラウド へのデータストリームエクスポート (コンソール) - AWS IoT Greengrass

2023 年 6 月 30 日に AWS IoT Greengrass Version 1 は延長ライフサイクルフェーズに入りました。詳細については、「AWS IoT Greengrass V1 メンテナンスポリシー」を参照してください。この日付以降、AWS IoT Greengrass V1 の機能、拡張機能、バグ修正、またはセキュリティパッチを提供するアップデートはリリースされません。AWS IoT Greengrass V1 で稼働中のデバイスは中断されず、引き続き動作し、クラウドに接続できます。重要な新機能新たなプラットフォームのサポートが追加された AWS IoT Greengrass Version 2 への移行を強くお勧めします。

AWS クラウド へのデータストリームエクスポート (コンソール)

このチュートリアルでは、AWS IoT コンソールを使用して、ストリームマネージャーを有効にし AWS IoT Greengrass グループを作成およびデプロイする方法について説明します。グループには、ストリームマネージャーのストリームに書き込むユーザー定義 Lambda 関数が含まれています。ストリームマネージャーは、自動的に AWS クラウド にエクスポートされます。

ストリームマネージャーにより、大量のデータストリームの取り込み、処理、エクスポートが効率化され、信頼性が向上します。このチュートリアルでは、IoT データを消費する TransferStream Lambda 関数を作成します。Lambda 関数は、AWS IoT Greengrass Core SDK を使用して、ストリームマネージャーでストリームを作成し、ストリームでの読み取りと書き込みを行います。ストリームマネージャーは、ストリームを Kinesis Data Streams にエクスポートします。以下の図は、このワークフローを示しています。

ストリーム管理ワークフローの図。

このチュートリアルでは、ユーザー定義の Lambda 関数が AWS IoT Greengrass Core SDK 内の StreamManagerClient オブジェクトを使用してストリームマネージャーとやり取りする方法について説明します。簡単にするために、このチュートリアルで作成する Python Lambda 関数は、シミュレートされたデバイスデータを生成します。

前提条件

このチュートリアルを完了するには、以下が必要です。

  • Greengrass グループと Greengrass コア (v1.10 以降)。Greengrass のグループまたはコアを作成する方法については、「AWS IoT Greengrass の開始方法」を参照してください。開始方法チュートリアルには、AWS IoT Greengrass Core ソフトウェアのインストール手順も含まれています。

    注記

    ストリームマネージャーは OpenWrt ディストリビューションではサポートされていません。

  • コアデバイスにインストールされている Java 8 ランタイム (JDK 8)。

    • Debian ベースのディストリビューション (Raspbian を含む) または Ubuntu ベースのディストリビューションの場合は、次のコマンドを実行します。

      sudo apt install openjdk-8-jdk
    • Red Hat ベースのディストリビューション (Amazon Linux を含む) の場合は、次のコマンドを実行します。

      sudo yum install java-1.8.0-openjdk

      詳細については、OpenJDK ドキュメントの「How to download and install prebuilt OpenJDK packages」を参照してください。

  • AWS IoT Greengrass Core SDK for Python v1.5.0 以降。AWS IoT Greengrass Core SDK for Python で StreamManagerClient を使用するには、以下を行う必要があります。

    • Python 3.7 以降をコアデバイスにインストールします。

    • SDK とその依存関係を Lambda 関数デプロイパッケージに含めます。このチュートリアルでは、手順を説明しています。

    ヒント

    StreamManagerClient を Java または NodeJS で使用できます。コードの例については、GitHub の 「AWS IoT Greengrass Core SDK for Java」と「AWS IoT Greengrass Core SDK for Node.js」を参照してください。

  • Greengrass グループと同じ AWS リージョン の Amazon Kinesis Data Streams で作成された MyKinesisStream という名前の送信先ストリーム。詳細については、「Amazon Kinesis デベロッパーガイド」の「ストリームを作成する」を参照してください。

    注記

    このチュートリアルでは、ストリームマネージャーが Kinesis Data Streams にデータをエクスポートするため、AWS アカウント に課金されます。料金の詳細については、「Amazon Kinesis Data Streams の料金」を参照してください。

    料金が発生しないようにするには、Kinesis データストリームを作成せずにこのチュートリアルを実行します。この場合、ログを確認して、ストリームマネージャーがストリームを Kinesis Data Streams にエクスポートしようとしたことを確認します。

  • 次の例に示すように、ターゲットデータストリームで kinesis:PutRecords アクションを許可する Greengrass グループのロール に IAM ポリシーが追加されている。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:region:account-id:stream/MyKinesisStream" ] } ] }

このチュートリアルのおおまかな手順は以下のとおりです。

このチュートリアルは完了までに約 20 分かかります。

ステップ 1: Lambda 関数デプロイパッケージを作成する

この手順では、Python 関数コードと依存関係を含む Lambda 関数デプロイパッケージを作成します。このパッケージは、後で AWS Lambda で Lambda 関数を作成するときにアップロードします。Lambda 関数は AWS IoT Greengrass Core SDK を使用して、ローカルストリームを作成および操作します。

注記

ユーザー定義の Lambda 関数は、AWS IoT Greengrass Core SDK を使用してストリームマネージャーと対話する必要があります。Greengrass ストリームマネージャーの要件の詳細については、「Greengrass ストリームマネージャーの要件」を参照してください。

  1. v1.5.0 以降の AWS IoT Greengrass Core SDK for Python をダウンロードします。

  2. ダウンロードしたパッケージを解凍し、SDK を取得します。SDK は greengrasssdk フォルダです。

  3. パッケージ依存関係をインストールして、Lambda 関数デプロイパッケージに SDK を含めます。

    1. requirements.txt ファイルのある SDK ディレクトリに移動します。このファイルには依存関係の一覧が記載されています。

    2. SDK の依存関係をインストールします。例えば、次の pip コマンドを実行して、現在のディレクトリにインストールします。

      pip install --target . -r requirements.txt
  4. 以下の Python コード関数を transfer_stream.py というローカルファイルに保存します。

    ヒント

    Java と NodeJS を使用するコードの例については、GitHub の「AWS IoT Greengrass Core SDK for Java」と「AWS IoT Greengrass Core SDK for Node.js」を参照してください。

    import asyncio import logging import random import time from greengrasssdk.stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ReadMessagesOptions, ResourceNotFoundException, StrategyOnFull, StreamManagerClient, ) # This example creates a local stream named "SomeStream". # It starts writing data into that stream and then stream manager automatically exports # the data to a customer-created Kinesis data stream named "MyKinesisStream". # This example runs forever until the program is stopped. # The size of the local stream on disk will not exceed the default (which is 256 MB). # Any data appended after the stream reaches the size limit continues to be appended, and # stream manager deletes the oldest data until the total stream size is back under 256 MB. # The Kinesis data stream in the cloud has no such bound, so all the data from this script is # uploaded to Kinesis and you will be charged for that usage. def main(logger): try: stream_name = "SomeStream" kinesis_stream_name = "MyKinesisStream" # Create a client for the StreamManager client = StreamManagerClient() # Try deleting the stream (if it exists) so that we have a fresh start try: client.delete_message_stream(stream_name=stream_name) except ResourceNotFoundException: pass exports = ExportDefinition( kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)] ) client.create_message_stream( MessageStreamDefinition( name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports ) ) # Append two messages and print their sequence numbers logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")), ) logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")), ) # Try reading the two messages we just appended and print them out logger.info( "Successfully read 2 messages: %s", client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)), ) logger.info("Now going to start writing random integers between 0 and 1000 to the stream") # Now start putting in random data between 0 and 1000 to emulate device sensor input while True: logger.debug("Appending new random integer to stream") client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big")) time.sleep(1) except asyncio.TimeoutError: logger.exception("Timed out while executing") except Exception: logger.exception("Exception while running") def function_handler(event, context): return logging.basicConfig(level=logging.INFO) # Start up this sample code main(logger=logging.getLogger())
  5. 以下の項目を transfer_stream_python.zip という名前のファイルに圧縮します。これが Lambda 関数デプロイパッケージです。

    • transfer_stream.py。アプリケーションロジック。

    • greengrasssdk。MQTT メッセージを発行する Python Greengrass Lambda 関数で必須のライブラリ。

      ストリームマネージャーの操作は、AWS IoT Greengrass Core SDK for Python のバージョン 1.5.0 以降で使用できます。

    • AWS IoT Greengrass Core SDK for Python にインストールした依存関係 (例えば、cbor2 ディレクトリ)。

    zip ファイルを作成するときは、これらの項目のみを含み、それらが含まれているフォルダは含みません。

ステップ 2: Lambda 関数を作成する

このステップでは、AWS Lambda コンソールを使用して Lambda 関数を作成し、デプロイパッケージを使用するようにその関数を設定します。次に、関数のバージョンを公開し、エイリアスを作成します。

  1. 最初に Lambda 関数を作成します。

    1. AWS Management Consoleで、[サービス] を選択し、AWS Lambda コンソールを開きます。

    2. [Create function] (関数の作成) を選択し、[Author from scratch] (一から作成) を選択します。

    3. [基本的な情報] セクションで、以下の値を使用します。

      • [関数名]TransferStream と入力します。

      • [ランタイム][Python 3.7] を選択します。

      • [アクセス許可] はデフォルト設定のままにしておきます。これで Lambda への基本的なアクセス許可を付与する実行ロールが作成されます。このロールは、AWS IoT Greengrass によっては使用されません。

    4. ページの下部で、[関数の作成] を選択します。

  2. 今度は、ハンドラを登録し、Lambda 関数デプロイパッケージをアップロードします。

    1. [Code] (コード) タブの [Code source] (コードソース) で、[Upload from] (アップロード元) を選択します。ドロップダウンから [.zip ファイル] を選択します。

      [.zip ファイル] が強調表示された [アップロード元] ドロップダウンリスト。
    2. [Upload] (アップロード) を選択し、transfer_stream_python.zip デプロイパッケージを選択します。次に、[保存] を選択します。

    3. 関数の [Code] (コード) タブにある [Runtime settings] (ランタイム設定) で [Edit] (編集) を選択し、次の値を入力します。

      • [ランタイム][Python 3.7] を選択します。

      • [ハンドラ]transfer_stream.function_handler と入力します。

    4. [保存] を選択します。

      注記

      AWS Lambda コンソールの [Test] (テスト) ボタンは、この関数では機能しません。AWS IoT Greengrass Core SDK には、AWS Lambda コンソールで Greengrass Lambda 関数を個別に実行するために必要なモジュールは含まれていません。これらのモジュール (例えば greengrass_common) が関数に提供されるのは、Greengrass Core にデプロイされた後になります。

  3. ここで、Lambda 関数の最初のバージョンを公開し、バージョンのエイリアスを作成します。

    注記

    Greengrass グループは、Lambda 関数をエイリアス別 (推奨) またはバージョン別に参照できます。エイリアスを使用すると、関数コードを更新する時にサブスクリプションテーブルやグループ定義を変更する必要がないため、コード更新を簡単に管理できます。その代わりに、新しい関数バージョンにエイリアスを指定するだけで済みます。

    1. [アクション] メニューから、[新しいバージョンを発行] を選択します。

    2. [バージョンの説明]First version と入力し、[発行] を選択します。

    3. [TransferStream: 1] 設定ページで、[アクション] メニューの [エイリアスを作成] を選択します。

    4. [新しいエイリアスの作成] ページで、次の値を使用します。

      • [名前]GG_TransferStream と入力します。

      • [バージョン] で、[1] を選択します。

      注記

      AWS IoT Greengrass は、$LATEST バージョンの Lambda エイリアスをサポートしていません。

    5. [作成] を選択します。

これで、Greengrass グループに Lambda 関数を追加する準備ができました。

ステップ 3: Greengrass グループに Lambda 関数を追加する

このステップでは、Lambda 関数をグループに追加し、そのライフサイクルと環境変数を設定します。詳細については、「グループ固有の設定による Greengrass Lambda 関数の実行の制御」を参照してください。

  1. AWS IoTコンソールのナビゲーションペインの [Manage] (管理) で、 [Greengrass devices] (Greengrass デバイス) を展開して、[Groups (V1)] (グループ 〔V1〕) を選択します。

  2. ターゲットグループを選択します。

  3. グループ設定ページで、[Lambda functions] (Lambda 関数) タブを選択します。

  4. [My Lambda functions] (自分の Lambda 関数) で、[Add] (追加)を選択します。

  5. [Lambda 関数の追加] ページで、Lambda 関数用の [Lambda 関数] を選択します。

  6. Lambda バージョンでは、Alias:GG_TransferStream を選択します。

    ここで、Greengrass グループの Lambda 関数の動作を決定するプロパティを設定します。

  7. [Lambda function configuration] (Lambda 関数の設定) セクションで、次のように変更します。

    • [メモリ制限] を 32 MB に設定します。

    • [Pinned] (固定)で、[True] を選択します。

    注記

    存続期間の長い (または固定された) Lambda 関数は、AWS IoT Greengrass の起動後に自動的に開始され、独自のコンテナで実行され続けます。これはオンデマンド Lambda 関数とは対照的です。この関数は呼び出されたときに開始し、実行するタスクが残っていないときに停止します。詳細については、「Greengrass Lambda 関数のライフサイクル設定」を参照してください。

  8. [Add Lambda function] (Lambda 関数の追加) を選択します。

ステップ 4: ストリームマネージャーを有効にする

この手順では、ストリームマネージャーが有効になっていることを確認します。

  1. グループ設定ページで、[Lambda functions] (Lambda 関数) タブを選択します。

  2. [System Lambda functions] (システム Lambda 関数) で、 [Stream manager] (ストリームマネージャー) を選択し、ステータスを確認します。無効になっている場合は、[編集] を選択します。次に、[有効化] を選択し、[保存] を選択します。このチュートリアルでは、デフォルトのパラメータ設定を使用できます。詳細については、「AWS IoT Greengrass ストリームマネージャーの設定」を参照してください。

注記

コンソールを使用してストリームマネージャーを有効にし、グループをデプロイすると、ストリームマネージャーのメモリサイズはデフォルトで 4194304 KB (4 GB) に設定されます。メモリのサイズは 128000 KB 以上に設定することをお勧めします。

ステップ 5: ローカルログを設定する

このステップでは、コアデバイスのファイルシステムにログを書き込むように、グループ内の AWS IoT Greengrass システムコンポーネント、ユーザー定義 Lambda 関数、コネクタを設定します。ログを使用して、発生する可能性のある問題のトラブルシューティングを行うことができます。詳細については、「AWS IoT Greengrass ログでのモニタリング」を参照してください。

  1. [ローカルログ設定] で、ローカルログが設定されているかどうかを確認します。

  2. Greengrass システムコンポーネントやユーザー定義 Lambda 関数のログが設定されていない場合は、[編集] を選択します。

  3. [User Lambda functions log level] (ユーザー Lambda 関数のログレベル) と [Greengrass system log level] (Greengrass システムのログレベル) を選択します。

  4. ログレベルとディスク容量制限のデフォルト値はそのままにし、[Save (保存)] を選択します。

ステップ 6: Greengrass グループをデプロイする

Core デバイスにグループをデプロイします。

  1. AWS IoT Greengrass Core が実行されていることを確認します。必要に応じて、Raspberry Pi のターミナルで以下のコマンドを実行します。

    1. デーモンが実行中かどうかを確認するには、以下を実行します。

      ps aux | grep -E 'greengrass.*daemon'

      出力に root で実行中の /greengrass/ggc/packages/ggc-version/bin/daemon のエントリが含まれていれば、デーモンは実行されています。

      注記

      パス内のバージョンは、コアデバイスにインストールされている AWS IoT Greengrass Core ソフトウェアのバージョンによって異なります。

    2. デーモンを開始するには、以下を実行します。

      cd /greengrass/ggc/core/ sudo ./greengrassd start
  2. グループ設定ページで、[Deploy] (デプロイ) を選択します。

    1. [Lambda functions] (Lambda 関数) タブの [System Lambda functions] (システム Lambda 関数) セクションで、[IP detector] (IP ディテクター)、[Edit] (編集) の順に選択します。

    2. [IP ディテクターの設定を編集] ダイアログボックスで、[MQTT ブローカーエンドポイントを自動的に検出して上書きする] を選択します。

    3. [保存] を選択します。

      これにより、デバイスは、IP アドレス、DNS、ポート番号など、コアの接続情報を自動的に取得できます。自動検出が推奨されますが、AWS IoT Greengrass は手動で指定されたエンドポイントもサポートしています。グループが初めてデプロイされたときにのみ、検出方法の確認が求められます。

      注記

      プロンプトが表示されたら、Greengrass サービスロールの作成権限を付与し、そのロールを現在の AWS リージョンの AWS アカウントに関連付けます。このロールを付与することで、AWS IoT Greengrass は AWS サービスのリソースにアクセスできます。

      [デプロイ] ページには、デプロイのタイムスタンプ、バージョン ID、ステータスが表示されます。完了すると、デプロイのステータスが [完了] と表示されます。

      トラブルシューティングのヘルプについては、「AWS IoT Greengrass のトラブルシューティング」を参照してください。

ステップ 7: アプリケーションをテストする

この TransferStream Lambda 関数は、シミュレートされたデバイスデータを生成します。ストリームマネージャーがターゲットの Kinesis データストリームにエクスポートするストリームにデータを書き込みます。

  1. Amazon Kinesis コンソールの [Kinesis data streams] (Kinesis データストリーム) で、[MyKinesisStream] を選択します。

    注記

    ターゲットの Kinesis データストリームを使用せずにチュートリアルを実行した場合は、ストリームマネージャーのログファイルを確認します (GGStreamManager)。エラーメッセージに export stream MyKinesisStream doesn't exist が含まれている場合、テストは成功します。このエラーは、サービスがストリームにエクスポートしようとしましたが、ストリームが存在しないことを意味します。

  2. [MyKinesisStream] ページで、[Monitoring (モニタリング)] を選択します。テストが成功すると、Put Records (レコードの配置) グラフにデータが表示されます。接続によっては、データが表示されるまでに 1 分かかることがあります。

    重要

    テストが終了したら、Kinesis データストリームを削除して、それ以上の料金が発生しないようにします。

    または、次のコマンドを実行して Greengrass デーモンを停止します。これにより、テストを続行する準備が整うまで、コアがメッセージを送信できなくなります。

    cd /greengrass/ggc/core/ sudo ./greengrassd stop
  3. コアから TransferStream Lambda 関数を削除します。

    1. AWS IoT コンソールのナビゲーションペインの [Manage] (管理) で、[Greengrass devices] (Greengrass デバイス) を展開して、[Groups (V1)] (グループ (V1))を選択します。

    2. [Greengrass グループ] で、目的のグループを選択します。

    3. [Lambdas] ページで、[TransferStream] 関数の省略記号 ()、[Remove function] (関数の削除) の順に選択します。

    4. [アクション] で、[デプロイ] を選択します。

ロギング情報を表示したり、ストリームに関する問題をトラブルシューティングしたりするには、TransferStream および GGStreamManager 関数のログを確認します。ファイルシステム上の AWS IoT Greengrass ログを読み取るには、root 権限が必要です。

  • TransferStream は、ログエントリを greengrass-root/ggc/var/log/user/region/account-id/TransferStream.log に書き込みます。

  • GGStreamManager は、ログエントリを greengrass-root/ggc/var/log/system/GGStreamManager.log に書き込みます。

トラブルシューティングの詳細が必要な場合は、[User Lambda logs] (ユーザー Lambda ログ) のログレベル[Debug logs] (デバッグログ) に設定してから、グループを再度デプロイできます。

関連情報