以符合成本效益的方式,使用 IoT 資料直接導入 Amazon S3 AWS - AWS 方案指引

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

以符合成本效益的方式,使用 IoT 資料直接導入 Amazon S3 AWS

創建者塞巴斯蒂安·維維亞尼 (AWS) 和里茲萬·西德 () AWS

環境:PoC 或試點

技術:分析;IoT

工作負載:開源

AWS服務:AWSIoT Greengrass; Amazon S3; 亞馬 Amazon Athena

Summary

此模式說明如何使用 IoT Greengrass 第 2 版裝置,以符合成本效益的方式將物聯網 (IoT) 資料直接導入 Amazon 簡易儲存服務 (Amazon S3) 儲存貯體。AWS裝置執行可讀取 IoT 資料的自訂元件,並將資料儲存在永久性儲存裝置 (亦即本機磁碟或磁碟區) 中。然後,裝置會將 IoT 資料壓縮為 Apache Parquet 檔案,並定期將資料上傳到 S3 儲存貯體。

您擷取的 IoT 資料量和速度僅受邊緣硬體功能和網路頻寬的限制。您可以使用 Amazon Athena 以符合成本效益的方式分析擷取的資料。Athena 支持使用 Amazon 託管 Graf ana 壓縮 Apache 實木複合地板文件和數據可視化。

先決條件和限制

先決條件

限制

  • 此模式中的資料不會即時上傳到 S3 儲存貯體。有一個延遲時間,您可以配置延遲時間。資料會暫時緩衝在 Edge 裝置中,然後在期限到期後上傳。

  • 只能SDK在爪哇、Node.js 和 Python 中使用。

架構

目標技術堆疊

  • Amazon S3

  • AWS IoT Greengrass

  • MQTT經紀人

  • 流管理器組件

目標架構

下圖顯示旨在擷取 IoT 感應器資料並將該資料存放在 S3 儲存貯體的架構。

架構圖

該圖顯示以下工作流程:

  1. 多個傳感器(例如溫度和閥門)更新發布給當地MQTT代理商。

  2. 訂閱這些感應器的 Parquet 檔案壓縮程式會更新主題並接收這些更新。

  3. Parquet 檔案壓縮程式會將更新儲存在本機。

  4. 期限過後,儲存的檔案會壓縮到 Parquet 檔案中,並傳遞至串流管理員,以上傳至指定的 S3 儲存貯體。

  5. 流管理器將 Parquet 文件上傳到 S3 存儲桶。

注意:串流管理員 (StreamManager) 是受管理的元件。如需如何將資料匯出至 Amazon S3 的範例,請參閱 AWS IoT Greengrass 文件中的串流管理員。您可以使用本地MQTT代理作為組件或其他經紀人,如 Eclipse 蚊子

工具

AWS工具

  • Amazon Athena 是一種互動式查詢服務,可協助您使用標準直接在 Amazon S3 中分析資料SQL。

  • Amazon Simple Storage Service (Amazon S3) 是一種雲端型物件儲存服務,可協助您儲存、保護和擷取任何數量的資料。

  • AWSIoT Greengrass 是開放原始碼 IoT 邊緣執行階段和雲端服務,可協助您在裝置上建置、部署和管理 IoT 應用程式。

其他工具

  • Apache 的實木複合地板是一種開源的面向列的數據文件格式,專為存儲和檢索。

  • MQTT(訊息佇列遙測傳輸) 是專為受限裝置所設計的輕量型訊息通訊協定。

最佳實務

對上傳的數據使用正確的分區格式

S3 儲存貯體中的根前置詞名稱沒有特定需求 (例如,"myAwesomeDataSet/""dataFromSource"),但我們建議您使用有意義的分區和前置詞,以便輕鬆瞭解資料集的用途。

我們還建議您在 Amazon S3 中使用正確的分區,以便查詢在資料集上以最佳方式執行。在下列範例中,資料會以HIVE格式分割,以便最佳化每個 Athena 查詢掃描的資料量。這樣可以改善效能並降低成本。

s3://<ingestionBucket>/<rootPrefix>/year=YY/month=MM/day=DD/HHMM_<suffix>.parquet

史诗

任務描述所需技能

建立 S3 儲存貯體。

  1. 建立 S3 儲存貯體或使用現有儲存貯體。

  2. 為您要擷取 IoT 資料的 S3 儲存貯體建立有意義的前置詞 (例如,s3:\\<bucket>\<prefix>)。

  3. 記錄您的前綴以供日後使用。

應用程式開發人員

將IAM權限新增至 S3 儲存貯體。

若要授與使用者對先前建立之 S3 儲存貯體和前置詞的寫入存取權,請將下列IAM政策新增至您的 AWS IoT Greengrass 角色:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3DataUpload", "Effect": "Allow", "Action": [ "s3:List*", "s3:Put*" ], "Resource": [ "arn:aws:s3:::<ingestionBucket>", "arn:aws:s3:::<ingestionBucket>/<prefix>/*" ] } ] }

如需詳細資訊,請參閱 Aurora 文件中的建立IAM政策以存取 Amazon S3 資源

接下來,更新 S3 儲存貯體的資源政策 (如果需要),以允許具有正確AWS體的寫入存取權。

應用程式開發人員
任務描述所需技能

更新元件的配方。

根據下列範例建立部署時,請更新元件組態

{ "region": "<region>", "parquet_period": <period>, "s3_bucket": "<s3Bucket>", "s3_key_prefix": "<s3prefix>" }

以您<region>的AWS區域、<period>定期間隔、<s3Bucket> S3 儲存貯體和您<s3prefix>的前置詞取代。

應用程式開發人員

建立元件。

執行以下任意一項:

  • 建立元件

  • 將元件新增至 CI/CD 配管 (如果有的話)。請務必將成品從成品存放庫複製到 AWS IoT Greengrass 成品值區。然後,創建或更新您的 AWS IoT Greengrass 組件。

  • 將 MQTT Broker 新增為元件,或稍後手動新增代理程式。附註:此決定會影響您可以搭配 Broker 使用的驗證配置。手動新增代理程式會將代理程式與 AWS IoT Greengrass 分離,並啟用代理程式的任何受支援的驗證配置。AWS提供的 Broker 元件具有預先定義的驗證配置。如需詳細資訊,請參閱 MQTT3.1.1 經紀人 (Moquette)MQTT5 個經紀人 (EMQX)。

應用程式開發人員

更新用MQTT戶端。

範例程式碼不會使用驗證,因為元件會在本機連線至 Broker。如果您的案例不同,請視需要更新MQTT用戶端區段。此外,請執行下列動作:

  1. 更新訂MQTT閱中的主題。

  2. 視需要更新MQTT訊息剖析器,因為來自每個來源的訊息可能不同。

應用程式開發人員
任務描述所需技能

更新核心裝置的部署。

如果 AWS IoT Greengrass 第 2 版核心裝置的部署已經存在,請修改部署。如果部署不存在,請建立新部署

若要為元件指定正確的名稱,請根據下列項目更新新元件的記錄檔管理員組態 (如果需要):

{ "logsUploaderConfiguration": { "systemLogsConfiguration": { ... }, "componentLogsConfigurationMap": { "<com.iot.ingest.parquet>": { "minimumLogLevel": "INFO", "diskSpaceLimit": "20", "diskSpaceLimitUnit": "MB", "deleteLogFileAfterCloudUpload": "false" } ... } }, "periodicUploadIntervalSec": "300" }

最後,完成針對您的 AWS IoT Greengrass 核心裝置的部署修訂。

應用程式開發人員
任務描述所需技能

檢查 AWS IoT 綠色磁碟區的記錄檔。

檢查以下各項:

  • 用MQTT戶端已成功連線至本機MQTT代理程式。

  • 用MQTT戶端已訂閱正確的主題。

  • 傳感器更新消息即將發送到有關MQTT主題的代理人。

  • 實木複合地板壓縮發生在每個週期間隔

應用程式開發人員

檢查 S3 存儲桶。

確認資料是否正在上傳到 S3 儲存貯體。您可以看到每個時期正在上傳的文件。

您也可以查詢下一節中的資料,以確認資料是否已上傳至 S3 儲存貯體。

應用程式開發人員
任務描述所需技能

創建一個數據庫和表。

  1. 建立 AWS Glue 資料庫 (如有需要)。

  2. 在 AWS Glue 中手動建立表格,或在 Glu AWS e 中執行爬蟲程式

應用程式開發人員

授予 Athena 對資料的存取權。

  1. 更新許可以允許 Athena 存取 S3 儲存貯體。如需詳細資訊,請參閱 Athena 說明文件中 AWS Glue 資料目錄中對資料庫和資料表的精確存取。

  2. 查詢資料庫中的資料表。

應用程式開發人員

故障診斷

問題解決方案

MQTT用戶端無法連線

MQTT客戶端未能訂閱

驗證代MQTT理程式的權限。如果您有經MQTT紀人AWS,請參閱 MQTT3.1.1 經紀人(Moquette)MQTT5 個經紀人(EMQX)。

鑲木地板文件沒有被創建

  • 確認主MQTT題正確無誤。

  • 驗證來自MQTT傳感器的消息格式正確。

物件未上傳至 S3 儲存貯體

  • 確認您具有網際網路連線能力和端點連線能力。

  • 確認 S3 儲存貯體的資源政策正確無誤。

  • 驗證 AWS IoT Greengrass 第 2 版核心裝置角色的權限。

相關資源

其他資訊

成本分析

下列成本分析案例示範此模式涵蓋的資料擷取方法如何影響雲端中的資料擷取成本。AWS此案例中的定價範例是以發佈時的價格為基礎。價格可能變動。此外,您的費用可能會因您的AWS地區、AWS服務配額以及與雲端環境相關的其他因素而有所不同。

輸入信號設置

此分析使用下列一組輸入訊號做為比較 IoT 擷取成本與其他可用替代方案的基礎。

信號數

Frequency (頻率)

每個訊號的資料

125

25 赫茲

8 位元組

在這種情況下,系統接收 125 個信號。每個信號為 8 個字節,並且每 40 毫秒(25 Hz)發生一次。這些信號可以單獨出現或分組在一個共同的有效載荷中。您可以根據需要選擇拆分和打包這些信號。您還可以確定延遲。延遲包括接收、累積和擷取資料的時間段。

為了進行比較,此案例的擷取作業是以「us-east-1AWS區域」為基礎。費用比較僅適用於AWS服務。其他成本 (例如硬體或連線能力) 不會計入分析中。

成本比較

下表顯示每個擷取方法的每月成本,以美元 (USD) 為單位。

方法

每月成本

AWSIoT SiteWise *

331.77 USD

AWS含資料處理套件的 IoT SiteWise Edge (將所有資料保留在邊緣)

200 USD

AWS用於存取原始資料的 IoT 核心和 Amazon S3 規則

84.54 USD

在邊緣實木複合地板文件壓縮並上傳到 Amazon S3

0.5 USD

* 資料必須縮減取樣才能符合服務配額。這意味著此方法存在一些數據丟失。

替代方法

本節顯示下列替代方法的等效成本:

  • AWSIoT SiteWise — 每個信號必須在單個消息中上傳。因此,每個月的消息總數是 125 × 25×3600×24×30,或者每月 81 億條消息。不過,AWSIoT 個屬性每秒只 SiteWise 能處理 10 個資料點。假設數據被縮減採樣到 10 赫茲,每月的消息數量減少到 125×10×3600×30,或 3.24 十億。如果您使用的發行者元件會以 10 個群組封裝度量 (USD每百萬則訊息為 1),則USD每月可獲得 324 個月的費用。假設每條消息都是 8 個字節(1 KB /125),那就是 25.92 GB 的數據存儲空間。這增加了每月 7.77 的USD每月費用。第一個月的總費用為 331.77USD,每月增加 7.77 USD。

  • AWS含資料處理套件的 IoT SiteWise Edge,包括在邊緣完整處理的所有模型和訊號 (也就是無雲端擷取) — 您可以使用資料處理套件作為替代方案,以降低成本並設定在邊緣計算的所有模型。即使沒有執行實際計算,這也可以僅用於存儲和可視化。在這種情況下,必須為邊緣閘道使用功能強大的硬體。USD每月有 200 的固定費用。

  • 透過將原始資料直接擷取至 AWS IoT 核心,以MQTT及將原始資料存放在 Amazon S3 中的 IoT 規則 — 假設所有訊號都發佈在一個通用的有效負載中,發佈到 AWS IoT 核心的訊息總數為 25×3600×24×30,或每月 64.8 百萬。USD每百萬封郵件為 1,這是每月 64.8 的USD每月費用。USD每百萬個規則啟動次數為 0.15,且每封郵件有一個規則,每月增加 19.44 USD 的每月費用。Amazon S3 中USD每 GB 儲存的成本為 0.023,每月新增 1.5 個 (USD每月增加以反映新資料)。第一個月的總費用為 84.54USD,USD每月增加 1.5。

  • 壓縮 Parquet 檔案邊緣的資料並上傳到 Amazon S3 (建議的方法) — 壓縮率取決於資料類型。通過測試相同的工業數據MQTT,整個月的總輸出數據為 1.2 Gb。這個費用為USD每月 0.03。其他基準測試中描述的壓縮比率(使用隨機數據)的順序為 66%(接近最壞情況)。總數據為 21 Gb,USD每月的費用為 0.5。

拼花文件生成器

下列程式碼範例會示範以 Python 撰寫的 Parquet 檔案產生器的結構。程式碼範例僅用於說明目的,如果貼到您的環境中,將無法運作。

import queue import paho.mqtt.client as mqtt import pandas as pd #queue for decoupling the MQTT thread messageQueue = queue.Queue() client = mqtt.Client() streammanager = StreamManagerClient() def feederListener(topic, message): payload = { "topic" : topic, "payload" : message, } messageQueue.put_nowait(payload) def on_connect(client_instance, userdata, flags, rc): client.subscribe("#",qos=0) def on_message(client, userdata, message): feederListener(topic=str(message.topic), message=str(message.payload.decode("utf-8"))) filename = "tempfile.parquet" streamname = "mystream" destination_bucket= "amzn-s3-demo-bucket" keyname="mykey" period= 60 client.on_connect = on_connect client.on_message = on_message streammanager.create_message_stream( MessageStreamDefinition(name=streamname, strategy_on_full=StrategyOnFull.OverwriteOldestData) ) while True: try: message = messageQueue.get(timeout=myArgs.mqtt_timeout) except (queue.Empty): logger.warning("MQTT message reception timed out") currentTimestamp = getCurrentTime() if currentTimestamp >= nextUploadTimestamp: df = pd.DataFrame.from_dict(accumulator) df.to_parquet(filename) s3_export_task_definition = S3ExportTaskDefinition(input_url=filename, bucket=destination_bucket, key=key_name) streammanager.append_message(streamname, Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) accumulator = {} nextUploadTimestamp += period else: accumulator.append(message)