使用 IoT Greengrass,以具成本效益的方式直接將 AWS IoT 資料擷取至 Amazon S3 - AWS 方案指引

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 IoT Greengrass,以具成本效益的方式直接將 AWS IoT 資料擷取至 Amazon S3

由 Sebastian Viviani (AWS) 和 Rizwan Syed (AWS) 建立

環境:PoC 或試行

技術:分析;IoT

工作負載:開放原始碼

AWS 服務:AWSIoT Greengrass;Amazon S3;Amazon Athena

Summary

此模式示範如何使用 AWS IoT Greengrass 第 2 版裝置,以經濟實惠的方式將物聯網 (IoT) 資料直接擷取到 Amazon Simple Storage Service (Amazon S3) 儲存貯體。裝置會執行自訂元件來讀取 IoT 資料,並將資料儲存在持久性儲存體 (即本機磁碟區或磁碟區) 中。然後,裝置會將 IoT 資料壓縮為 Apache Parquet 檔案,並定期將資料上傳到 S3 儲存貯體。

您擷取的 IoT 資料數量和速度僅受限於您的邊緣硬體功能和網路頻寬。您可以使用 Amazon Athena 以符合成本效益的方式分析擷取的資料。Athena 使用 Amazon Managed Grafana 支援壓縮的 Apache Parquet 檔案和資料視覺化。

先決條件和限制

先決條件

限制

  • 此模式中的資料不會即時上傳至 S3 儲存貯體。有延遲期間,您可以設定延遲期間。資料會在邊緣裝置中暫時緩衝,然後在期間到期後上傳。

  • SDK 僅適用於 Java、Node.js 和 Python。

架構

目標技術堆疊

  • Amazon S3

  • AWS IoT Greengrass

  • MQTT 代理程式

  • 串流管理員元件

目標架構

下圖顯示設計用來擷取 IoT 感應器資料的架構,並將該資料存放在 S3 儲存貯體中。

架構圖

該圖顯示以下工作流程:

  1. 多個感應器 (例如,溫度和閥) 更新會發佈給本機MQTT代理程式。

  2. 訂閱這些感應器的 Parquet 檔案壓縮器會更新主題並接收這些更新。

  3. Parquet 檔案壓縮器會在本機儲存更新。

  4. 期間結束後,儲存的檔案會壓縮為 Parquet 檔案,並傳遞給串流管理員,以上傳到指定的 S3 儲存貯體。

  5. 串流管理員會將 Parquet 檔案上傳至 S3 儲存貯體。

注意:串流管理器 (StreamManager) 是受管元件。如需如何將資料匯出至 Amazon S3 的範例,請參閱 AWS IoT Greengrass 文件中的串流管理器。您可以使用本機MQTT代理程式作為元件,或像 Eclipse Mosquitto 這樣的其他代理程式。

工具

AWS 工具

  • Amazon Athena 是一種互動式查詢服務,可協助您使用標準 直接在 Amazon S3 中分析資料SQL。

  • Amazon Simple Storage Service (Amazon S3) 是一種雲端型物件儲存服務,可協助您儲存、保護和擷取任何數量的資料。

  • AWS IoT Greengrass 是一種開放原始碼 IoT 邊緣執行期和雲端服務,可協助您在裝置上建置、部署和管理 IoT 應用程式。

其他工具

  • Apache Parquet 是一種開放原始碼資料欄導向的資料檔案格式,專為儲存和擷取而設計。

  • MQTT (訊息佇列遙測傳輸) 是一種輕量型傳訊通訊協定,專為受限的裝置而設計。

最佳實務

針對上傳的資料使用正確的分割區格式

對於 S3 儲存貯體 (例如 "myAwesomeDataSet/""dataFromSource") 中的根字首名稱沒有特定要求,但建議您使用有意義的分割區和字首,以便輕鬆了解資料集的目的。

我們也建議您在 Amazon S3 中使用正確的分割,以便在資料集上以最佳方式執行查詢。在下列範例中,資料會以 HIVE 格式分割,以便最佳化每個 Athena 查詢掃描的資料量。這可改善效能並降低成本。

s3://<ingestionBucket>/<rootPrefix>/year=YY/month=MM/day=DD/HHMM_<suffix>.parquet

史詩

任務描述所需的技能

建立 S3 儲存貯體。

  1. 建立 S3 儲存貯體或使用現有儲存貯體。

  2. 為您要擷取 IoT 資料的 S3 儲存貯體建立有意義的字首 (例如 s3:\\<bucket>\<prefix>)。

  3. 記錄您的字首以供日後使用。

應用程式開發人員

將IAM許可新增至 S3 儲存貯體。

若要授予使用者對先前建立的 S3 儲存貯體和字首的寫入存取權,請將下列IAM政策新增至您的 AWS IoT Greengrass 角色:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3DataUpload", "Effect": "Allow", "Action": [ "s3:List*", "s3:Put*" ], "Resource": [ "arn:aws:s3:::<ingestionBucket>", "arn:aws:s3:::<ingestionBucket>/<prefix>/*" ] } ] }

如需詳細資訊,請參閱 Aurora 文件中的建立存取 Amazon S3 資源IAM的政策

接下來,更新 S3 儲存貯體的資源政策 (如果需要),以允許使用正確的AWS主體 進行寫入存取。

應用程式開發人員
任務描述所需的技能

更新 元件的配方。

根據下列範例建立部署時,請更新元件組態

{ "region": "<region>", "parquet_period": <period>, "s3_bucket": "<s3Bucket>", "s3_key_prefix": "<s3prefix>" }

將 取代<region>為您的 AWS 區域、<period>您的週期間隔、<s3Bucket>您的 S3 儲存貯體,以及<s3prefix>您的 字首。

應用程式開發人員

建立元件。

執行以下任意一項:

  • 建立元件

  • 將元件新增至 CI/CD 管道 (如果有)。請務必將成品從成品儲存庫複製到 AWS IoT Greengrass 成品儲存貯體。然後,建立或更新 AWS IoT Greengrass 元件。

  • 將MQTT代理程式新增為元件,或稍後手動新增。注意:此決定會影響您可以與代理程式搭配使用的身分驗證方案。手動新增代理程式會將代理程式與 AWS IoT Greengrass 分離,並啟用代理程式的任何支援身分驗證方案。AWS 提供的代理程式元件具有預先定義的身分驗證方案。如需詳細資訊,請參閱 MQTT 3.1.1 代理程式 (Moquette)MQTT 5 代理程式 (EMQX)

應用程式開發人員

更新MQTT用戶端。

範例程式碼不使用身分驗證,因為元件會在本機連線至代理程式。如果您的案例不同,請視需要更新MQTT用戶端區段。此外,請執行下列動作:

  1. 更新訂閱中的MQTT主題。

  2. 視需要更新MQTT訊息剖析器,因為來自每個來源的訊息可能不同。

應用程式開發人員
任務描述所需的技能

更新核心裝置的部署。

如果 AWS IoT Greengrass 第 2 版核心裝置的部署已存在,請修改部署 。如果部署不存在,請建立新的部署

若要為元件提供正確的名稱,請根據下列項目更新新元件的日誌管理員組態 (如果需要):

{ "logsUploaderConfiguration": { "systemLogsConfiguration": { ... }, "componentLogsConfigurationMap": { "<com.iot.ingest.parquet>": { "minimumLogLevel": "INFO", "diskSpaceLimit": "20", "diskSpaceLimitUnit": "MB", "deleteLogFileAfterCloudUpload": "false" } ... } }, "periodicUploadIntervalSec": "300" }

最後,完成 AWS IoT Greengrass 核心裝置的部署修訂。

應用程式開發人員
任務描述所需的技能

檢查 AWS IoT Greengrass 磁碟區的日誌。

檢查以下各項:

  • MQTT 用戶端已成功連線至本機MQTT代理程式。

  • MQTT 用戶端已訂閱正確的主題。

  • 感應器更新訊息即將傳送至MQTT主題的代理程式。

  • Parquet 壓縮在每個週期性間隔發生。

應用程式開發人員

檢查 S3 儲存貯體。

確認資料是否正在上傳到 S3 儲存貯體。您可以在每個期間看到正在上傳的檔案。

您也可以在下一節中查詢資料,確認資料是否已上傳至 S3 儲存貯體。

應用程式開發人員
任務描述所需的技能

建立資料庫和資料表。

  1. 建立 AWS Glue 資料庫 (如有需要)。

  2. 在 AWS Glue 中手動建立資料表,或在 Glue AWS 中執行 Acrawler

應用程式開發人員

授予 Athena 對資料的存取權。

  1. 更新許可以允許 Athena 存取 S3 儲存貯體。如需詳細資訊,請參閱 Athena 文件中的 Glue Data Catalog AWS 中精細存取資料庫和資料表

  2. 查詢資料庫中的資料表。

應用程式開發人員

故障診斷

問題解決方案

MQTT 用戶端無法連線

MQTT 用戶端無法訂閱

驗證MQTT代理程式的許可。如果您有來自 的MQTT代理程式AWS,請參閱 MQTT 3.1.1 代理程式 (Moquette)MQTT 5 個代理程式 (EMQX)

未建立 Parquet 檔案

  • 驗證MQTT主題是否正確。

  • 確認來自感應器MQTT的訊息格式正確。

物件不會上傳至 S3 儲存貯體

  • 確認您具有網際網路連線和端點連線。

  • 驗證 S3 儲存貯體的資源政策是否正確。

  • 驗證 AWS IoT Greengrass 第 2 版核心裝置角色的許可。

相關資源

其他資訊

成本分析

下列成本分析案例示範此模式涵蓋的資料擷取方法如何影響 AWS Cloud 中的資料擷取成本。此案例中的定價範例是以發佈時的價格為基礎。價格可能變動。此外,您的成本可能會因您的AWS區域、AWS服務配額和與雲端環境相關的其他因素而有所不同。

輸入訊號集

此分析使用下列一組輸入訊號,做為比較 IoT 擷取成本與其他可用替代方案的基礎。

訊號數量

Frequency (頻率)

每個訊號的資料

125

25 Hz

8 位元組

在此案例中,系統會接收 125 個訊號。每個訊號都是 8 個位元組,每 40 毫秒 (25 Hz) 就會發生。這些訊號可能會個別出現,或在通用承載中分組。您可以選擇根據您的需求分割和封裝這些訊號。您也可以判斷延遲。延遲包含接收、累積和擷取資料的期間。

為了比較,此案例的擷取操作是以 us-east-1AWS區域為基礎。成本比較僅適用於 AWS服務。其他成本,例如硬體或連線能力,不會納入分析。

成本比較

下表顯示每個擷取方法的每月成本,以美元 (USD) 為單位。

方法

每月成本

AWS IoT SiteWise *

331.77 USD

AWS IoT SiteWise Edge 搭配資料處理套件 (將所有資料保留在邊緣)

200 USD

AWS 存取原始資料的 IoT Core 和 Amazon S3 規則

84.54 USD

在邊緣進行 Parquet 檔案壓縮並上傳至 Amazon S3

0.5 USD

*必須對資料進行向下取樣,以符合服務配額。這表示此方法有一些資料遺失。

替代方法

本節顯示下列替代方法的同等成本:

  • AWS IoT SiteWise – 每個訊號都必須以個別訊息上傳。因此,每月的訊息總數為 125×25×3600×24×30,或每月 81 億則訊息。不過,AWSIoT SiteWise 每個屬性每秒只能處理 10 個資料點。假設將資料縮減為 10 Hz,則每月的訊息數量會減少為 125×10×3600×24×30 或 32.4 億。如果您使用以 10 (USD每百萬則訊息 1 則) 為群組封裝測量結果的發佈者元件,則USD每月會獲得 324 筆費用。假設每個訊息都是 8 個位元組 (1 Kb/125),即 25.92 Gb 的資料儲存體。這增加了每月 7.77 的成本。 USD第一個月的總成本為 331.77,USD每月USD增加 7.77。

  • AWS IoT SiteWise Edge 具有資料處理套件,包括在邊緣完全處理的所有模型和訊號 (即沒有雲端擷取) – 您可以使用資料處理套件作為替代方案,以降低成本並設定在邊緣計算的所有模型。即使未執行實際計算,這也可以僅適用於儲存和視覺化。在此情況下,邊緣閘道必須使用強大的硬體。每月固定成本為 200USD。

  • 由 MQTT和 IoT 規則直接擷取至 AWS IoT Core,以將原始資料儲存在 Amazon S3 中 – 假設所有訊號都發佈在通用承載中,則發佈至 AWS IoT Core 的訊息總數為每月 25×3600×24×30 或 6480 萬。USD 每百萬則 1 則訊息,即每月 64.8 的USD每月成本。USD 每百萬條規則啟用 0.15 次,每則訊息有一個規則,每月增加 19.44 次的成本USD。Amazon S3 中USD每 Gb 儲存的成本為 0.023,USD每月又增加 1.5 (每月增加以反映新資料)。 Amazon S3 第一個月的總成本為 84.54,USD每月增加 1.5。 USD

  • 壓縮 Parquet 檔案中邊緣的資料並上傳至 Amazon S3 (建議的方法 ) – 壓縮率取決於資料類型。使用針對 測試的相同工業資料MQTT,整個月份的總輸出資料為 1.2 Gb。每月 0.03 的費用USD。其他基準中描述的壓縮比率 (使用隨機資料) 的順序為 66% (較接近最壞情況)。總資料為 21 Gb,USD每月成本為 0.5。

Parquet 檔案產生器

下列程式碼範例顯示以 Python 寫入的 Parquet 檔案產生器結構。程式碼範例僅供說明之用,如果貼入您的環境,則無法運作。

import queue import paho.mqtt.client as mqtt import pandas as pd #queue for decoupling the MQTT thread messageQueue = queue.Queue() client = mqtt.Client() streammanager = StreamManagerClient() def feederListener(topic, message): payload = { "topic" : topic, "payload" : message, } messageQueue.put_nowait(payload) def on_connect(client_instance, userdata, flags, rc): client.subscribe("#",qos=0) def on_message(client, userdata, message): feederListener(topic=str(message.topic), message=str(message.payload.decode("utf-8"))) filename = "tempfile.parquet" streamname = "mystream" destination_bucket= "amzn-s3-demo-bucket" keyname="mykey" period= 60 client.on_connect = on_connect client.on_message = on_message streammanager.create_message_stream( MessageStreamDefinition(name=streamname, strategy_on_full=StrategyOnFull.OverwriteOldestData) ) while True: try: message = messageQueue.get(timeout=myArgs.mqtt_timeout) except (queue.Empty): logger.warning("MQTT message reception timed out") currentTimestamp = getCurrentTime() if currentTimestamp >= nextUploadTimestamp: df = pd.DataFrame.from_dict(accumulator) df.to_parquet(filename) s3_export_task_definition = S3ExportTaskDefinition(input_url=filename, bucket=destination_bucket, key=key_name) streammanager.append_message(streamname, Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) accumulator = {} nextUploadTimestamp += period else: accumulator.append(message)