IoT Greengrass を使用してコスト効率よく AWS IoT データを Amazon S3 に直接取り込む - AWS 規範ガイダンス

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

IoT Greengrass を使用してコスト効率よく AWS IoT データを Amazon S3 に直接取り込む

作成者: Sebastian Viviani (AWS) と Rizwan Syed (AWS)

環境:PoC またはパイロット

テクノロジー: 分析、IoT

ワークロード: オープンソース

AWS サービス: AWS IoT Greengrass、Amazon S3、Amazon Athena

[概要]

このパターンは、AWSIoT Greengrass バージョン 2 デバイスを使用して、モノのインターネット (IoT ) データを Amazon Simple Storage Service (Amazon S3) バケットに直接コスト効率よく取り込む方法を示しています。デバイスは、IoT データを読み取り、データを永続ストレージ (つまり、ローカルディスクまたはボリューム) に保存するカスタムコンポーネントを実行します。次に、デバイスは IoT データを Apache Parquet ファイルに圧縮し、そのデータを定期的に S3 バケットにアップロードします。

取り込むIoTデータの量と速度は、エッジハードウェアの機能とネットワーク帯域幅によってのみ制限されます。Amazon Athena を使用すれば、取り込んだデータをコスト効率よく分析することができます。Athena は、圧縮された Apache Parquet ファイルと「Amazon Managed Grafana」によるデータの視覚化をサポートしています。

前提条件と制限

前提条件

制約事項

  • このパターンのデータは S3 バケットにリアルタイムでアップロードされません。遅延期間があり、遅延期間を設定できます。データは一時的にエッジデバイスにバッファされ、期間が終了するとアップロードされます。

  • SDK は、Java、Node.js、および Python でのみ使用できます。

アーキテクチャ

ターゲットテクノロジースタック

  • Amazon S3

  • AWS IoT Greengrass

  • MQTT ブローカー

  • ストリームマネージャーコンポーネント

ターゲットアーキテクチャ

次の図は、IoT センサーデータを取り込み、そのデータを S3 バケットに保存するように設計されたアーキテクチャを示しています。

アーキテクチャ図

この図表は、次のワークフローを示しています:

  1. 複数のセンサー (例: 温度や泡沫) の更新がローカルMQTTブローカーに発行されます。

  2. これらのセンサーにサブスクライブされている Parquet ファイルコンプレッサーは、トピックを更新し、更新を受信します。

  3. Parquet ファイルコンプレッサーは更新をローカルに保存します。

  4. 期間が経過すると、保存されたファイルは Parquet ファイルに圧縮され、ストリームマネージャーに渡され、指定された S3 バケットにアップロードされます。

  5. ストリームマネージャーは Parquet ファイルを S3 バケットにアップロードします。

:ストリームマネージャー (StreamManager) は管理対象コンポーネントです。Amazon S3 にデータをエクスポートする方法の例については、 AWS IoT Greengrass ドキュメントの「ストリームマネージャー」を参照してください。ローカルMQTTブローカーは、コンポーネントとして、または Eclipse Mosquitto などの別のブローカーとして使用できます。

ツール

AWS ツール

  • Amazon Athena は、標準 を使用して Amazon S3 内のデータを直接分析するのに役立つインタラクティブなクエリサービスですSQL。

  • Amazon Simple Storage Service (Amazon S3) は、どのようなデータ量であっても、データを保存、保護、取得することを支援するクラウドベースのオブジェクトストレージサービスです。

  • AWS IoT Greengrass は、デバイスで IoT アプリケーションを構築、デプロイ、管理するのに役立つオープンソースの IoT エッジランタイムおよびクラウドサービスです。

その他のツール

  • Apache Parquet」は、ストレージとデータの取得を目的として設計されたオープンソースの列指向データファイル形式です。

  • MQTT (Message Queuing Telemetry Transport) は、制約のあるデバイス用に設計された軽量メッセージングプロトコルです。

ベストプラクティス

アップロードされたデータには適切なパーティション形式を使用してください。

S3 バケットのルートプレフィックス名 (たとえば、"myAwesomeDataSet/""dataFromSource") には特定の要件はありませんが、データセットの目的がわかりやすいように、わかりやすいパーティションとプレフィックスを使用することをお勧めします。

また、クエリがデータセットで最適に実行されるように、Amazon S3 では適切なパーティション分割を使用することをお勧めします。次の例では、各 Athena クエリでスキャンされるデータ量が最適化されるように、データを HIVE形式でパーティション化しています。これにより、パフォーマンスを向上させ、コストを削減できます。

s3://<ingestionBucket>/<rootPrefix>/year=YY/month=MM/day=DD/HHMM_<suffix>.parquet

エピック

タスク説明必要なスキル

S3 バケットを作成する。

  1. S3 バケットを作成するか、既存のバケットを使用します。

  2. IoT データを取り込む S3 バケットにわかりやすい「プレフィックス」を作成します (例:s3:\\<bucket>\<prefix>)。

  3. 後で使用するためにプレフィックスを記録します。

アプリ開発者

S3 バケットにアクセスIAM許可を追加します。

前に作成した S3 バケットとプレフィックスへの書き込みアクセスをユーザーに許可するには、AWSIoT Greengrass ロールに次のIAMポリシーを追加します。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3DataUpload", "Effect": "Allow", "Action": [ "s3:List*", "s3:Put*" ], "Resource": [ "arn:aws:s3:::<ingestionBucket>", "arn:aws:s3:::<ingestionBucket>/<prefix>/*" ] } ] }

詳細については、Aurora ドキュメントのAmazon S3リソースにアクセスするための IAMポリシーの作成」を参照してください。

次に、S3 バケットのリソースポリシー (必要な場合) を更新して、正しいAWSプリンシパル による書き込みアクセスを許可します。

アプリ開発者
タスク説明必要なスキル

コンポーネントのレシピを更新します。

デプロイを作成」するときは、次の例に基づいて「コンポーネント設定を更新」します。

{ "region": "<region>", "parquet_period": <period>, "s3_bucket": "<s3Bucket>", "s3_key_prefix": "<s3prefix>" }

をAWSリージョン<region>に、 を定期的な間隔<period>に、 を S3 バケット<s3Bucket>に、 をプレフィックス<s3prefix>に置き換えます。

アプリ開発者

コンポーネントを作成します。

次のいずれかを行います。

  • コンポーネントを作成します。

  • CI/CD パイプライン (存在する場合) にコンポーネントを追加します。アーティファクトリポジトリから AWS IoT Greengrass アーティファクトバケットにアーティファクトをコピーしてください。次に、AWSIoT Greengrass コンポーネントを作成または更新します。

  • MQTT ブローカーをコンポーネントとして追加するか、後で手動で追加します。:この決定は、ブローカーで使用できる認証スキームに影響します。ブローカーを手動で追加すると、ブローカーが AWS IoT Greengrass から切り離され、ブローカーでサポートされている認証スキームが有効になります。AWS 提供されるブローカーコンポーネントには、事前定義された認証スキームがあります。詳細については、MQTT「3.1.1 ブローカー (モケット)」およびMQTT「5 ブローカー (EMQX)」を参照してください。

アプリ開発者

MQTT クライアントを更新します。

このサンプルコードでは、コンポーネントはブローカーにローカルに接続するため、認証を使用していません。シナリオが異なる場合は、必要に応じてMQTTクライアントセクションを更新します。さらに、以下の作業を行います。

  1. サブスクリプションのMQTTトピックを更新します。

  2. 各ソースからのMQTTメッセージが異なる可能性があるため、必要に応じてメッセージパーサーを更新します。

アプリ開発者
タスク説明必要なスキル

コアデバイスのデプロイを更新します。

AWS IoT Greengrass バージョン 2 コアデバイスのデプロイが既に存在する場合は、デプロイ を修正します。デプロイが存在しない場合は、「新しいデプロイを作成」します。

コンポーネントに正しい名前を付けるには、以下に基づいて新しいコンポーネントの「ログマネージャー設定を更新」します (必要に応じて) 。

{ "logsUploaderConfiguration": { "systemLogsConfiguration": { ... }, "componentLogsConfigurationMap": { "<com.iot.ingest.parquet>": { "minimumLogLevel": "INFO", "diskSpaceLimit": "20", "diskSpaceLimitUnit": "MB", "deleteLogFileAfterCloudUpload": "false" } ... } }, "periodicUploadIntervalSec": "300" }

最後に、AWSIoT Greengrass コアデバイスのデプロイのリビジョンを完了します。

アプリ開発者
タスク説明必要なスキル

AWS IoT Greengrass ボリュームのログを確認します。

以下を確認します。

  • MQTT クライアントがローカルMQTTブローカーに正常に接続されました。

  • MQTT クライアントは正しいトピックにサブスクライブされています。

  • センサーの更新メッセージは、 MQTTトピックでブローカーに送信されています。

  • Parquet の圧縮は定期的に行われます。

アプリ開発者

S3 バケットを確認します。

S3 バケットにデータがアップロードされているかどうかを確認します。各期間にアップロードされているファイルを確認できます。

次のセクションでデータをクエリすることで、データが S3 バケットにアップロードされたかどうかを確認することもできます。

アプリ開発者
タスク説明必要なスキル

データベースとテーブルを作成します。

  1. AWS Glue データベースを作成します (必要な場合)。

  2. AWS Glue でテーブルを手動で作成https://docs.aws.amazon.com/glue/latest/dg/tables-described.htmlするか、 Glue AWS でクローラを実行します。

アプリ開発者

Athena にデータへのアクセスを許可します。

  1. Athena が S3 バケットにアクセスすることを許可する権限を更新します。詳細については、Athena ドキュメントの Glue Data Catalog AWS の「データベースとテーブルへのきめ細かなアクセス」を参照してください。

  2. データベース内のテーブルをクエリします。

アプリ開発者

トラブルシューティング

問題ソリューション

MQTT クライアントが接続に失敗する

MQTT クライアントがサブスクライブに失敗する

MQTT ブローカーに対するアクセス許可を検証します。MQTT のブローカーがある場合はAWS、MQTT「3.1.1 ブローカー (モケット)」とMQTT「5 ブローカー (EMQX)」を参照してください。

パーケットファイルは作成されません。

  • MQTT トピックが正しいことを確認します。

  • センサーからのMQTTメッセージが正しい形式であることを確認します。

オブジェクトは S3 バケットにアップロードされません。

  • インターネット接続とエンドポイント接続があることを確認します。

  • S3 バケットのリソースポリシーが正しいことを確認します。

  • AWS IoT Greengrass バージョン 2 コアデバイスロールのアクセス許可を確認します。

関連リソース

追加情報

コスト分析

次のコスト分析シナリオは、このパターンでカバーされるデータインジェストアプローチが AWS クラウドのデータインジェストコストにどのように影響するかを示しています。このシナリオの料金例は、公開時の価格に基づいています。料金は変更されることがあります。さらに、コストは、AWSリージョン、AWSサービスクォータ、およびクラウド環境に関連するその他の要因によって異なる場合があります。

入力信号セット

この分析では、IoT の取り込みコストを他の利用可能な代替手段と比較するための基礎として、以下の入力信号セットを使用します。

シグナル数。

[Frequency] (頻度)

1 信号あたりのデータ

125

25 ヘルツ

8 バイト

このシナリオでは、システムは 125 個の信号を受信します。各信号は 8 バイトで、40 ミリ秒 (25 Hz) ごとに発生します。これらの信号は、個別に受信することも、共通のペイロードにまとめて送信することもできます。これらの信号は、必要に応じて分割してパックすることができます。レイテンシーも決定できます。レイテンシーは、データの受信、蓄積、および取り込みにかかる時間で構成されます。

比較のため、このシナリオの取り込みオペレーションは us-east-1AWSリージョンに基づいています。コスト比較は AWSサービスにのみ適用されます。ハードウェアや接続などの他のコストは、分析には考慮されません。

コスト比較

次の表は、各取り込み方法の月額コストを米ドル (USD) で示しています。

方法

月額コスト

AWS IoT SiteWise *

331.77 USD

AWS データ処理パック付き IoT SiteWise Edge (すべてのデータをエッジに保持)

200 USD

AWS raw データにアクセスするための IoT Core および Amazon S3 ルール

84.54 USD

エッジでの寄木細工ファイルの圧縮と Amazon S3 へのアップロード

0.5 USD

*サービスクォータを満たすには、データをダウンサンプリングする必要があります。つまり、この方法ではデータの一部が失われるということです。

代替方法

このセクションでは、以下の代替方法の等価コストを示します。

  • AWS IoT SiteWise – 各シグナルは、個々のメッセージでアップロードする必要があります。したがって、1 か月あたりのメッセージの総数は 125×25×3600×24×30、つまり 1 か月あたり 81 億メッセージになります。ただし、AWSIoT SiteWise はプロパティごとに 1 秒あたり 10 個のデータポイントしか処理できません。データが 10 Hz にダウンサンプリングされると仮定すると、1 か月あたりのメッセージ数は 125×10×3600×24×30、つまり 32.4 億に減少します。測定値を 10 (100 万件のメッセージUSDあたり 1 件) のグループにパックするパブリッシャーコンポーネントを使用すると、1 か月あたり 324 USD件の月額料金が発生します。各メッセージが 8 バイト (1 Kb/125) であると仮定すると、25.92 GB のデータストレージになります。これにより、1 か月USDあたり 7.77 の月額コストが追加されます。最初の 1 か月の合計コストは 331.77 でUSD、USD毎月 7.77 増加します。

  • AWS エッジで完全に処理された (つまり、クラウド取り込みがない) すべてのモデルとシグナルを含むデータ処理パックを備えた IoT SiteWise Edge – データ処理パックを代替として使用してコストを削減し、エッジで計算されるすべてのモデルを設定できます。これは、実際の計算を行わなくても、保存と視覚化のためだけに機能します。この場合、エッジゲートウェイには強力なハードウェアを使用する必要があります。固定コストは 1 か月USDあたり 200 です。

  • による AWS IoT Core への直接取り込みMQTTと、raw データを Amazon S3 に保存するための IoT ルール — すべてのシグナルが共通のペイロードで発行され、AWSIoT Core に発行されたメッセージの合計数が 25×3600×24×30、または 1 か月あたり 6480 万であるとします。メッセージ 100 万USD件あたり 1 件の場合、月額料金は 64.8 USD です。100 万ルールのアクティベーションUSDあたり 0.15 で、メッセージごとに 1 つのルールがある場合、1 か月USDあたり 19.44 の月額コストが追加されます。Amazon S3 のストレージ 1 Gb USDあたり 0.023 のコストで、1 か月USDあたりさらに 1.5 が追加されます (新しいデータを反映するために毎月増加)。 Amazon S3 最初の 1 か月の合計コストは 84.54 でUSD、USD毎月 1.5 ずつ増加します。

  • Parquetファイルの端でデータを圧縮してAmazon S3 にアップロードする(提案方法)— 圧縮率はデータの種類によって異なります。に対してテストされたのと同じ産業用データではMQTT、1 か月の合計出力データは 1.2 Gb です。料金は 1 か月USDあたり 0.03 です。他のベンチマークで説明されている圧縮率 (ランダムデータを使用) は、約 66% (最悪のシナリオに近い) です。合計データは 21 Gb で、コストは 1 か月USDあたり 0.5 です。

Parquetファイルジェネレーター

次のコード例は、Python で記述された Parquet ファイルジェネレーターの構造を示しています。このコード例は説明のみを目的としており、ご使用の環境に貼り付けても動作しません。

import queue import paho.mqtt.client as mqtt import pandas as pd #queue for decoupling the MQTT thread messageQueue = queue.Queue() client = mqtt.Client() streammanager = StreamManagerClient() def feederListener(topic, message): payload = { "topic" : topic, "payload" : message, } messageQueue.put_nowait(payload) def on_connect(client_instance, userdata, flags, rc): client.subscribe("#",qos=0) def on_message(client, userdata, message): feederListener(topic=str(message.topic), message=str(message.payload.decode("utf-8"))) filename = "tempfile.parquet" streamname = "mystream" destination_bucket= "amzn-s3-demo-bucket" keyname="mykey" period= 60 client.on_connect = on_connect client.on_message = on_message streammanager.create_message_stream( MessageStreamDefinition(name=streamname, strategy_on_full=StrategyOnFull.OverwriteOldestData) ) while True: try: message = messageQueue.get(timeout=myArgs.mqtt_timeout) except (queue.Empty): logger.warning("MQTT message reception timed out") currentTimestamp = getCurrentTime() if currentTimestamp >= nextUploadTimestamp: df = pd.DataFrame.from_dict(accumulator) df.to_parquet(filename) s3_export_task_definition = S3ExportTaskDefinition(input_url=filename, bucket=destination_bucket, key=key_name) streammanager.append_message(streamname, Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) accumulator = {} nextUploadTimestamp += period else: accumulator.append(message)