本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 IoT Greengrass,以具成本效益的方式直接將 AWS IoT 資料擷取至 Amazon S3
由 Sebastian Viviani (AWS) 和 Rizwan Syed (AWS) 建立
環境:PoC 或試行 | 技術:分析;IoT | 工作負載:開放原始碼 |
AWS 服務:AWSIoT Greengrass;Amazon S3;Amazon Athena |
Summary
此模式示範如何使用 AWS IoT Greengrass 第 2 版裝置,以經濟實惠的方式將物聯網 (IoT) 資料直接擷取到 Amazon Simple Storage Service (Amazon S3) 儲存貯體。裝置會執行自訂元件來讀取 IoT 資料,並將資料儲存在持久性儲存體 (即本機磁碟區或磁碟區) 中。然後,裝置會將 IoT 資料壓縮為 Apache Parquet 檔案,並定期將資料上傳到 S3 儲存貯體。
您擷取的 IoT 資料數量和速度僅受限於您的邊緣硬體功能和網路頻寬。您可以使用 Amazon Athena 以符合成本效益的方式分析擷取的資料。Athena 使用 Amazon Managed Grafana 支援壓縮的 Apache Parquet 檔案和資料視覺化。
先決條件和限制
先決條件
作用中AWS帳戶
在 AWS IoT Greengrass 第 2 版上執行並從感應器收集資料的邊緣閘道 (資料來源和資料收集程序超出此模式的範圍,但您可以使用幾乎任何類型的感應器資料。 此模式使用本機MQTT
代理程式搭配在本機發佈資料的感應器或閘道。) 將資料上傳至 S3 儲存貯體的串流管理器元件
AWS SDK for Java
、 AWS SDK for JavaScript 、 或 AWS SDK for Python (Boto3) 執行 APIs
限制
此模式中的資料不會即時上傳至 S3 儲存貯體。有延遲期間,您可以設定延遲期間。資料會在邊緣裝置中暫時緩衝,然後在期間到期後上傳。
SDK 僅適用於 Java、Node.js 和 Python。
架構
目標技術堆疊
Amazon S3
AWS IoT Greengrass
MQTT 代理程式
串流管理員元件
目標架構
下圖顯示設計用來擷取 IoT 感應器資料的架構,並將該資料存放在 S3 儲存貯體中。
該圖顯示以下工作流程:
多個感應器 (例如,溫度和閥) 更新會發佈給本機MQTT代理程式。
訂閱這些感應器的 Parquet 檔案壓縮器會更新主題並接收這些更新。
Parquet 檔案壓縮器會在本機儲存更新。
期間結束後,儲存的檔案會壓縮為 Parquet 檔案,並傳遞給串流管理員,以上傳到指定的 S3 儲存貯體。
串流管理員會將 Parquet 檔案上傳至 S3 儲存貯體。
注意:串流管理器 (StreamManager
) 是受管元件。如需如何將資料匯出至 Amazon S3 的範例,請參閱 AWS IoT Greengrass 文件中的串流管理器。您可以使用本機MQTT代理程式作為元件,或像 Eclipse Mosquitto
工具
AWS 工具
Amazon Athena 是一種互動式查詢服務,可協助您使用標準 直接在 Amazon S3 中分析資料SQL。
Amazon Simple Storage Service (Amazon S3) 是一種雲端型物件儲存服務,可協助您儲存、保護和擷取任何數量的資料。
AWS IoT Greengrass 是一種開放原始碼 IoT 邊緣執行期和雲端服務,可協助您在裝置上建置、部署和管理 IoT 應用程式。
其他工具
Apache Parquet
是一種開放原始碼資料欄導向的資料檔案格式,專為儲存和擷取而設計。 MQTT (訊息佇列遙測傳輸) 是一種輕量型傳訊通訊協定,專為受限的裝置而設計。
最佳實務
針對上傳的資料使用正確的分割區格式
對於 S3 儲存貯體 (例如 "myAwesomeDataSet/"
或 "dataFromSource"
) 中的根字首名稱沒有特定要求,但建議您使用有意義的分割區和字首,以便輕鬆了解資料集的目的。
我們也建議您在 Amazon S3 中使用正確的分割,以便在資料集上以最佳方式執行查詢。在下列範例中,資料會以 HIVE 格式分割,以便最佳化每個 Athena 查詢掃描的資料量。這可改善效能並降低成本。
s3://<ingestionBucket>/<rootPrefix>/year=YY/month=MM/day=DD/HHMM_<suffix>.parquet
史詩
任務 | 描述 | 所需的技能 |
---|---|---|
建立 S3 儲存貯體。 | 應用程式開發人員 | |
將IAM許可新增至 S3 儲存貯體。 | 若要授予使用者對先前建立的 S3 儲存貯體和字首的寫入存取權,請將下列IAM政策新增至您的 AWS IoT Greengrass 角色:
如需詳細資訊,請參閱 Aurora 文件中的建立存取 Amazon S3 資源IAM的政策。 接下來,更新 S3 儲存貯體的資源政策 (如果需要),以允許使用正確的AWS主體 進行寫入存取。 | 應用程式開發人員 |
任務 | 描述 | 所需的技能 |
---|---|---|
更新 元件的配方。 |
將 取代 | 應用程式開發人員 |
建立元件。 | 執行以下任意一項:
| 應用程式開發人員 |
更新MQTT用戶端。 | 範例程式碼不使用身分驗證,因為元件會在本機連線至代理程式。如果您的案例不同,請視需要更新MQTT用戶端區段。此外,請執行下列動作:
| 應用程式開發人員 |
任務 | 描述 | 所需的技能 |
---|---|---|
更新核心裝置的部署。 | 如果 AWS IoT Greengrass 第 2 版核心裝置的部署已存在,請修改部署 。如果部署不存在,請建立新的部署 。 若要為元件提供正確的名稱,請根據下列項目更新新元件的日誌管理員組態 (如果需要):
最後,完成 AWS IoT Greengrass 核心裝置的部署修訂。 | 應用程式開發人員 |
任務 | 描述 | 所需的技能 |
---|---|---|
檢查 AWS IoT Greengrass 磁碟區的日誌。 | 檢查以下各項:
| 應用程式開發人員 |
檢查 S3 儲存貯體。 | 確認資料是否正在上傳到 S3 儲存貯體。您可以在每個期間看到正在上傳的檔案。 您也可以在下一節中查詢資料,確認資料是否已上傳至 S3 儲存貯體。 | 應用程式開發人員 |
任務 | 描述 | 所需的技能 |
---|---|---|
建立資料庫和資料表。 |
| 應用程式開發人員 |
授予 Athena 對資料的存取權。 |
| 應用程式開發人員 |
故障診斷
問題 | 解決方案 |
---|---|
MQTT 用戶端無法連線 |
|
MQTT 用戶端無法訂閱 | 驗證MQTT代理程式的許可。如果您有來自 的MQTT代理程式AWS,請參閱 MQTT 3.1.1 代理程式 (Moquette) 和 MQTT 5 個代理程式 (EMQX)。 |
未建立 Parquet 檔案 |
|
物件不會上傳至 S3 儲存貯體 |
|
相關資源
DataFrame
(Pandas 文件) Apache Parquet 文件
(Parquet 文件) 開發 AWS IoT Greengrass 元件 (AWS IoT Greengrass 開發人員指南,第 2 版)
將 AWS IoT Greengrass 元件部署至 裝置 (AWS IoT Greengrass 開發人員指南,第 2 版)
與本機 IoT 裝置互動 (AWS IoT Greengrass 開發人員指南,第 2 版)
MQTT 3.1.1 代理程式 (Moquette) (AWS IoT Greengrass 開發人員指南,第 2 版)
MQTT 5 代理程式 (EMQX) (AWS IoT Greengrass 開發人員指南,第 2 版)
其他資訊
成本分析
下列成本分析案例示範此模式涵蓋的資料擷取方法如何影響 AWS Cloud 中的資料擷取成本。此案例中的定價範例是以發佈時的價格為基礎。價格可能變動。此外,您的成本可能會因您的AWS區域、AWS服務配額和與雲端環境相關的其他因素而有所不同。
輸入訊號集
此分析使用下列一組輸入訊號,做為比較 IoT 擷取成本與其他可用替代方案的基礎。
訊號數量 | Frequency (頻率) | 每個訊號的資料 |
125 | 25 Hz | 8 位元組 |
在此案例中,系統會接收 125 個訊號。每個訊號都是 8 個位元組,每 40 毫秒 (25 Hz) 就會發生。這些訊號可能會個別出現,或在通用承載中分組。您可以選擇根據您的需求分割和封裝這些訊號。您也可以判斷延遲。延遲包含接收、累積和擷取資料的期間。
為了比較,此案例的擷取操作是以 us-east-1
AWS區域為基礎。成本比較僅適用於 AWS服務。其他成本,例如硬體或連線能力,不會納入分析。
成本比較
下表顯示每個擷取方法的每月成本,以美元 (USD) 為單位。
方法 | 每月成本 |
AWS IoT SiteWise * | 331.77 USD |
AWS IoT SiteWise Edge 搭配資料處理套件 (將所有資料保留在邊緣) | 200 USD |
AWS 存取原始資料的 IoT Core 和 Amazon S3 規則 | 84.54 USD |
在邊緣進行 Parquet 檔案壓縮並上傳至 Amazon S3 | 0.5 USD |
*必須對資料進行向下取樣,以符合服務配額。這表示此方法有一些資料遺失。
替代方法
本節顯示下列替代方法的同等成本:
AWS IoT SiteWise – 每個訊號都必須以個別訊息上傳。因此,每月的訊息總數為 125×25×3600×24×30,或每月 81 億則訊息。不過,AWSIoT SiteWise 每個屬性每秒只能處理 10 個資料點。假設將資料縮減為 10 Hz,則每月的訊息數量會減少為 125×10×3600×24×30 或 32.4 億。如果您使用以 10 (USD每百萬則訊息 1 則) 為群組封裝測量結果的發佈者元件,則USD每月會獲得 324 筆費用。假設每個訊息都是 8 個位元組 (1 Kb/125),即 25.92 Gb 的資料儲存體。這增加了每月 7.77 的成本。 USD第一個月的總成本為 331.77,USD每月USD增加 7.77。
AWS IoT SiteWise Edge 具有資料處理套件,包括在邊緣完全處理的所有模型和訊號 (即沒有雲端擷取) – 您可以使用資料處理套件作為替代方案,以降低成本並設定在邊緣計算的所有模型。即使未執行實際計算,這也可以僅適用於儲存和視覺化。在此情況下,邊緣閘道必須使用強大的硬體。每月固定成本為 200USD。
由 MQTT和 IoT 規則直接擷取至 AWS IoT Core,以將原始資料儲存在 Amazon S3 中 – 假設所有訊號都發佈在通用承載中,則發佈至 AWS IoT Core 的訊息總數為每月 25×3600×24×30 或 6480 萬。USD 每百萬則 1 則訊息,即每月 64.8 的USD每月成本。USD 每百萬條規則啟用 0.15 次,每則訊息有一個規則,每月增加 19.44 次的成本USD。Amazon S3 中USD每 Gb 儲存的成本為 0.023,USD每月又增加 1.5 (每月增加以反映新資料)。 Amazon S3 第一個月的總成本為 84.54,USD每月增加 1.5。 USD
壓縮 Parquet 檔案中邊緣的資料並上傳至 Amazon S3 (建議的方法 ) – 壓縮率取決於資料類型。使用針對 測試的相同工業資料MQTT,整個月份的總輸出資料為 1.2 Gb。每月 0.03 的費用USD。其他基準中描述的壓縮比率 (使用隨機資料) 的順序為 66% (較接近最壞情況)。總資料為 21 Gb,USD每月成本為 0.5。
Parquet 檔案產生器
下列程式碼範例顯示以 Python 寫入的 Parquet 檔案產生器結構。程式碼範例僅供說明之用,如果貼入您的環境,則無法運作。
import queue import paho.mqtt.client as mqtt import pandas as pd #queue for decoupling the MQTT thread messageQueue = queue.Queue() client = mqtt.Client() streammanager = StreamManagerClient() def feederListener(topic, message): payload = { "topic" : topic, "payload" : message, } messageQueue.put_nowait(payload) def on_connect(client_instance, userdata, flags, rc): client.subscribe("#",qos=0) def on_message(client, userdata, message): feederListener(topic=str(message.topic), message=str(message.payload.decode("utf-8"))) filename = "tempfile.parquet" streamname = "mystream" destination_bucket= "amzn-s3-demo-bucket" keyname="mykey" period= 60 client.on_connect = on_connect client.on_message = on_message streammanager.create_message_stream( MessageStreamDefinition(name=streamname, strategy_on_full=StrategyOnFull.OverwriteOldestData) ) while True: try: message = messageQueue.get(timeout=myArgs.mqtt_timeout) except (queue.Empty): logger.warning("MQTT message reception timed out") currentTimestamp = getCurrentTime() if currentTimestamp >= nextUploadTimestamp: df = pd.DataFrame.from_dict(accumulator) df.to_parquet(filename) s3_export_task_definition = S3ExportTaskDefinition(input_url=filename, bucket=destination_bucket, key=key_name) streammanager.append_message(streamname, Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) accumulator = {} nextUploadTimestamp += period else: accumulator.append(message)