導出支持的配置AWS 雲端目的地 - AWS IoT Greengrass

AWS IoT Greengrass Version 1 於 2023 年 6 月 30 日進入延長壽命階段。如需詳細資訊,請參閱 AWS IoT Greengrass V1 維護政策 。在此日期之後, AWS IoT Greengrass V1 不會發佈提供功能、增強功能、錯誤修正或安全修補程式的更新。在 上執行的裝置 AWS IoT Greengrass V1 不會中斷,並會繼續運作和連線至雲端。我們強烈建議您遷移至 AWS IoT Greengrass Version 2,這會新增重要的新功能,並支援其他平台

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

導出支持的配置AWS 雲端目的地

使用者定義的 Lambda 函數使用StreamManagerClient中的AWS IoT Greengrass與串流管理員互動的核心軟體開發套件。當 Lambda 函數建立串流或者更新串流,它會傳遞一個MessageStreamDefinition對象,表示流屬性,包括導出定義。所以此ExportDefinition對象包含為流定義的導出配置。流管理器使用這些導出配置來確定導出流的位置和方式。

對象模型圖 ExportDefinition 屬性類型。

您可以在流上定義零個或多個導出配置,包括單個目標類型的多個導出配置。例如,您可以將串流匯出至兩個AWS IoT Analytics信道和 Kinesis 資料串流。

對於失敗的導出嘗試,流管理器會不斷地重試將數據導出到AWS 雲端時間長達五分鐘。重試嘗試次數沒有最大限制。

注意

StreamManagerClient還提供了目標目的地,您可以使用它將串流匯出至 HTTP 服務器。此目標僅供測試之用。它不穩定或不支援用於生產環境中。

你可以為維護這些AWS 雲端的費用。

AWS IoT Analytics 頻道

流管理器支持自動導出到AWS IoT Analytics。AWS IoT Analytics允許您對數據執行高級分析,以幫助制定業務決策並改進機器學習模型。如需詳細資訊,請參閱「」什麼是AWS IoT Analytics?中的AWS IoT Analytics使用者指南

在 中AWS IoT Greengrass核心 SDK,您的 Lambda 函數使用IoTAnalyticsConfig定義此目標類型的導出配置。如需詳細資訊,請參適用於您目標語言的軟體開發套件參考:

要求

此導出目的地有下列需求:

  • 目標通道AWS IoT Analytics必須在相同的AWS 帳戶和AWS 區域Greengrass roup。

  • 所以此Greengrass 群組角色必須允許iotanalytics:BatchPutMessage權限設置為目標通道。例如:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iotanalytics:BatchPutMessage" ], "Resource": [ "arn:aws:iotanalytics:region:account-id:channel/channel_1_name", "arn:aws:iotanalytics:region:account-id:channel/channel_2_name" ] } ] }

    您可以為資源授予細微或條件式存取 (例如,使用萬用字元)*命名方案。如需詳細資訊,請參閱「」新增和移除 IAM 政策中的IAM User Guide

匯出至AWS IoT Analytics

建立串流以導出至AWS IoT Analytics,您的 Lambda 函數建立串流包含一或多個IoTAnalyticsConfig物件。此對象定義導出設置,例如目標通道、批處理大小、批處理間隔和優先級。

當您的 Lambda 函數從設備接收數據時,它們附加訊息,其中包含到目標流的數據 Blob。

然後,流管理器根據流導出配置中定義的批處理設置和優先級導出數據。

 

Amazon Kinesis Data Streams

串流管理員支援 Amazon Kinesis Data Streams 自動導出。Kinesis Data Streams 通常用於彙總大量數據,並將其載入數據倉儲或 map-reduce 集。如需詳細資訊,請參閱「」什麼是 Amazon Kinesis Data Streams?中的Amazon Kinesis 開發人員指南

在 中AWS IoT Greengrass核心 SDK,您的 Lambda 函數使用KinesisConfig定義此目標類型的導出配置。如需詳細資訊,請參適用於您目標語言的軟體開發套件參考:

要求

此導出目的地有下列需求:

  • Kinesis Data Streams 中的目標串流必須位於相同的AWS 帳戶和AWS 區域Greengrass roup。

  • 所以此Greengrass 群組角色必須允許kinesis:PutRecords權限來定位數據流。例如:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:region:account-id:stream/stream_1_name", "arn:aws:kinesis:region:account-id:stream/stream_2_name" ] } ] }

    您可以為資源授予細微或條件式存取 (例如,使用萬用字元)*命名方案。如需詳細資訊,請參閱「」新增和移除 IAM 政策中的IAM User Guide

導出至 Kinesis Data Streams

要創建導出到 Kinesis Data Streams,您的 Lambda 函數建立串流包含一或多個KinesisConfig物件。此對象定義導出設置,例如目標數據流、批處理大小、批處理間隔和優先級。

當您的 Lambda 函數從設備接收數據時,它們附加訊息,其中包含到目標流的數據 Blob。然後,流管理器根據流導出配置中定義的批處理設置和優先級導出數據。

流管理器會為上傳到 Amazon Kinesis 的每條記錄生成一個唯一的隨機 UUID 作為分區密鑰。

 

AWS IoT SiteWise資產屬性

流管理器支持自動導出到AWS IoT SiteWise。AWS IoT SiteWise允許您大規模地收集、組織和分析工業設備的資料。如需詳細資訊,請參閱「」什麼是AWS IoT SiteWise?中的AWS IoT SiteWise使用者指南

在 中AWS IoT Greengrass核心 SDK,您的 Lambda 函數使用IoTSiteWiseConfig定義此目標類型的導出配置。如需詳細資訊,請參適用於您目標語言的軟體開發套件參考:

注意

AWS還提供IoT SiteWise 接頭,這是一個預構建的解決方案,您可以與 OPC-UA 源一起使用。

要求

此導出目的地有下列需求:

  • 目標資產屬性AWS IoT SiteWise必須在相同的AWS 帳戶和AWS 區域Greengrass roup。

    注意

    如需區域列表,AWS IoT SiteWise支持,請參閲AWS IoT SiteWise端點和配額中的AWS一般參考

  • 所以此Greengrass 群組角色必須允許iotsitewise:BatchPutAssetPropertyValue權限來定位資源屬性。以下範例政策使用iotsitewise:assetHierarchyPath條件鍵授予目標根資產及其子級的存取。您可以移除Condition以允許存取您的所有AWS IoT SiteWise資產或指定單個資產的 ARN。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "iotsitewise:BatchPutAssetPropertyValue", "Resource": "*", "Condition": { "StringLike": { "iotsitewise:assetHierarchyPath": [ "/root node asset ID", "/root node asset ID/*" ] } } } ] }

    您可以為資源授予細微或條件式存取 (例如,使用萬用字元)*命名方案。如需詳細資訊,請參閱「」新增和移除 IAM 政策中的IAM User Guide

    如需重要的安全性資訊,請參 BatchPutAssetPropertyValue 授權中的AWS IoT SiteWise使用者指南

匯出至AWS IoT SiteWise

建立串流以導出至AWS IoT SiteWise,您的 Lambda 函數建立串流包含一或多個IoTSiteWiseConfig物件。此對象定義導出設置,如批處理大小、批處理間隔和優先級。

當 Lambda 函數從設備接收資產屬性數據時,它們會將包含數據的消息附加到目標流。消息是 JSON 序列化的PutAssetPropertyValueEntry對象,其中包含一個或多個資源屬性的屬性值。如需詳細資訊,請參閱「」附加訊息為了AWS IoT SiteWise匯出目的地。

注意

當您將資料發送到AWS IoT SiteWise,您的資料必須符合BatchPutAssetPropertyValue動作。如需詳細資訊,請參閱《AWS IoT SiteWise API 參考》中的 BatchPutAssetPropertyValue

然後,流管理器根據流導出配置中定義的批處理設置和優先級導出數據。

 

您可以調整流管理器設置和 Lambda 函數邏輯來設計導出策略。例如:

  • 對於近乎實時的導出,請設置較低的批量大小和間隔設置,並在接收數據時將數據附加到流中。

  • 要優化批處理、緩解帶寬限制或最大限度地降低成本,Lambda 函數可以將 timestamp-quality-value (TQV) 為單個資產屬性接收的數據點,然後再將數據附加到流中。一種策略是在一條消息中批處理最多 10 個不同的屬性-資產組合或屬性別名的條目,而不是為同一屬性發送多個條目。這有助於流管理器保持在AWS IoT SiteWise配額

 

Amazon S3 物件

串流管理員支援 Amazon S3 自動導出。您可以使用 Amazon S3 存放和檢索大量資料。如需詳細資訊,請參閱「」什麼是 Amazon S3?中的Amazon Simple Service 開發人員指南

在 中AWS IoT Greengrass核心 SDK,您的 Lambda 函數使用S3ExportTaskExecutorConfig定義此目標類型的導出配置。如需詳細資訊,請參適用於您目標語言的軟體開發套件參考:

要求

此導出目的地有下列需求:

  • 目標 Amazon S3 儲存貯體必須位於相同AWS 帳戶作為 Greengrass 組。

  • 如果默認容器化Greengrass Greengrass GreengrassGreengrass 容器,您必須設定流管理器-只讀-目錄參數使用輸入文件目錄,該文件目錄位於/tmp或者不在根文件系統上。

  • 如果一個 Lambda 函數運行在Greengrass 容器模式將輸入文件寫入到輸入文件目錄中,則必須為該目錄創建本地卷資源,並將該目錄掛載到具有寫入權限的容器中。這可確保文件寫入根文件系統並在容器外部可見。如需詳細資訊,請參閱 使用 Lambda 函數和連接器存取本機資源

  • 所以此Greengrass 群組角色必須允許對目標存儲桶具有以下權限。例如:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts" ], "Resource": [ "arn:aws:s3:::bucket-1-name/*", "arn:aws:s3:::bucket-2-name/*" ] } ] }

    您可以為資源授予細微或條件式存取 (例如,使用萬用字元)*命名方案。如需詳細資訊,請參閱「」新增和移除 IAM 政策中的IAM User Guide

匯出至 Amazon S3

要創建導出到 Amazon S3 的流,您的 Lambda 函數使用S3ExportTaskExecutorConfig對象配置導出策略。該策略定義了導出設置,例如分段上傳閾值和優先級。對於 Amazon S3 導出,流管理器會上傳它從核心設備上的本地文件讀取的數據。要啟動上載,您的 Lambda 函數會將導出任務附加到目標流。導出任務包含有關輸入文件和目標 Amazon S3 對象的信息。流管理器按照追加到流的順序執行任務。

注意

目標儲存貯體必須已存在於您的AWS 帳戶。如果指定鍵的對象不存在,流管理器將為您創建該對象。

下圖顯示此高級別工作流程。

Amazon S3 導出的串流管理員工作流程圖。

流管理器使用分段上傳閾值屬性最小零件大小設置和輸入文件的大小以確定如何上傳數據。分段上傳閾值必須大於或等於最小分段大小。如果您想要 parallel 行上傳資料串流,您可以建立多個串流。

指定目標 Amazon S3 對象的密鑰可以包含有效的Java DateTimeFormatter在中的字串!{timestamp:value}佔位置。您可以使用這些時間戳佔位符根據上傳輸輸入文件數據的時間對 Amazon S3 中的數據進行分區。例如,下列鍵名稱會解析為諸如my-key/2020/12/31/data.txt

my-key/!{timestamp:YYYY}/!{timestamp:MM}/!{timestamp:dd}/data.txt
注意

如果要監視流的導出狀態,請首先創建一個狀態流,然後將導出流配置為使用它。如需詳細資訊,請參閱 監控匯出任務

管理輸入資料

您可以編寫 IoT 應用程序用於管理輸入數據生命週期的代碼。以下範例工作流程顯示如何使用 Lambda 函數來管理此資料。

  1. 本地進程從設備或外圍設備接收數據,然後將數據寫入核心設備上某個目錄中的文件。這些是流管理器的輸入文件。

    注意

    要確定是否必須配置對輸入文件目錄的訪問權限,請參閲流管理器-只讀-目錄參數。

    串流管理員所在的進程將繼承默認訪問身分用於組的。串流管理員必須具備存取輸入文件的許可。您可以使用chmod(1)命令以更改文件許可 (如果需要的話)。

  2. Lambda 函數掃描目錄和附加匯出任務添加到目標流。該任務是一個 JSON 序列化S3ExportTaskDefinition對象,該對象指定輸入文件的 URL、目標 Amazon S3 存儲桶和密鑰以及可選的用户元數據。

  3. 流管理器讀取輸入文件並按附加任務的順序將數據導出到 Amazon S3。目標儲存貯體必須已存在於您的AWS 帳戶。如果指定鍵的對象不存在,流管理器將為您創建該對象。

  4. Lambda 函數讀取訊息以監視導出狀態。導出任務完成後,Lambda 函數可以刪除相應的輸入文件。如需詳細資訊,請參閱 監控匯出任務

監控匯出任務

您可以編寫 IoT 應用程序來監控 Amazon S3 導出狀態的代碼。您的 Lambda 函數必須創建狀態流,然後將導出流配置為將狀態更新寫入狀態流。單個狀態流可以從導出到 Amazon S3 的多個流中接收狀態更新。

首先,建立串流用作狀態流。您可以配置流的大小和保留策略,以控制狀態消息的生命週期。例如:

  • 設定PersistenceMemory如果您不想存放狀態消息。

  • 設定StrategyOnFullOverwriteOldestData,以便新的狀態消息不會丟失。

然後,創建或更新導出流以使用狀態流。具體來説,設置流的S3ExportTaskExecutorConfig匯出配置。這會告訴流管理器將有關導出任務的狀態消息寫入狀態流。在 中StatusConfig對象中,指定狀態流的名稱和詳細程度級別。以下支持的值範圍為最小詳細(ERROR)到最詳細的(TRACE。預設為 INFO

  • ERROR

  • WARN

  • INFO

  • DEBUG

  • TRACE

 

以下示例工作流顯示 Lambda 函數如何使用狀態流來監視導出狀態。

  1. 如上一個工作流程中所述,Lambda 函數附加匯出任務添加到配置為將有關導出任務的狀態消息寫入狀態流的流。追加操作返回表示任務 ID 的序列號。

  2. Lambda 函數讀取訊息,然後根據流名稱和任務 ID 或根據消息上下文中的導出任務屬性篩選消息。例如,Lambda 函數可以按導出任務的輸入文件 URL 進行過濾,該 URL 由S3ExportTaskDefinition對象在消息上下文中。

    以下狀態代碼表示導出任務已達到完成狀態:

    • Success。 已完成上傳已順利完成。

    • Failure。 流管理器遇到錯誤,例如,指定的存儲桶不存在。解決問題後,您可以再次將導出任務附加到流中。

    • Canceled。 該任務已中止,原因是流或導出定義已刪除,或者 time-to-live (TTL) 期限已過期。

    注意

    該任務還可能具有InProgress或者Warning。當事件返回不影響任務執行的錯誤時,流管理器會發出警告。例如,清理中止的部分上載失敗將返回警告。

  3. 導出任務完成後,Lambda 函數可以刪除相應的輸入文件。

以下示例説明 Lambda 函數如何讀取和處理狀態消息。

Python
import time from greengrasssdk.stream_manager import ( ReadMessagesOptions, Status, StatusConfig, StatusLevel, StatusMessage, StreamManagerClient, ) from greengrasssdk.stream_manager.util import Util client = StreamManagerClient() try: # Read the statuses from the export status stream is_file_uploaded_to_s3 = False while not is_file_uploaded_to_s3: try: messages_list = client.read_messages( "StatusStreamName", ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000) ) for message in messages_list: # Deserialize the status message first. status_message = Util.deserialize_json_bytes_to_obj(message.payload, StatusMessage) # Check the status of the status message. If the status is "Success", # the file was successfully uploaded to S3. # If the status was either "Failure" or "Cancelled", the server was unable to upload the file to S3. # We will print the message for why the upload to S3 failed from the status message. # If the status was "InProgress", the status indicates that the server has started uploading # the S3 task. if status_message.status == Status.Success: logger.info("Successfully uploaded file at path " + file_url + " to S3.") is_file_uploaded_to_s3 = True elif status_message.status == Status.Failure or status_message.status == Status.Canceled: logger.info( "Unable to upload file at path " + file_url + " to S3. Message: " + status_message.message ) is_file_uploaded_to_s3 = True time.sleep(5) except StreamManagerException: logger.exception("Exception while running") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 開發套件參考:讀取消息|StatusMessage

Java
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient; import com.amazonaws.greengrass.streammanager.client.utils.ValidateAndSerialize; import com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions; import com.amazonaws.greengrass.streammanager.model.Status; import com.amazonaws.greengrass.streammanager.model.StatusConfig; import com.amazonaws.greengrass.streammanager.model.StatusLevel; import com.amazonaws.greengrass.streammanager.model.StatusMessage; try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { try { boolean isS3UploadComplete = false; while (!isS3UploadComplete) { try { // Read the statuses from the export status stream List<Message> messages = client.readMessages("StatusStreamName", new ReadMessagesOptions().withMinMessageCount(1L).withReadTimeoutMillis(1000L)); for (Message message : messages) { // Deserialize the status message first. StatusMessage statusMessage = ValidateAndSerialize.deserializeJsonBytesToObj(message.getPayload(), StatusMessage.class); // Check the status of the status message. If the status is "Success", the file was successfully uploaded to S3. // If the status was either "Failure" or "Canceled", the server was unable to upload the file to S3. // We will print the message for why the upload to S3 failed from the status message. // If the status was "InProgress", the status indicates that the server has started uploading the S3 task. if (Status.Success.equals(statusMessage.getStatus())) { System.out.println("Successfully uploaded file at path " + FILE_URL + " to S3."); isS3UploadComplete = true; } else if (Status.Failure.equals(statusMessage.getStatus()) || Status.Canceled.equals(statusMessage.getStatus())) { System.out.println(String.format("Unable to upload file at path %s to S3. Message %s", statusMessage.getStatusContext().getS3ExportTaskDefinition().getInputUrl(), statusMessage.getMessage())); sS3UploadComplete = true; } } } catch (StreamManagerException ignored) { } finally { // Sleep for sometime for the S3 upload task to complete before trying to read the status message. Thread.sleep(5000); } } catch (e) { // Properly handle errors. } } catch (StreamManagerException e) { // Properly handle exception. }

Java 開發套件參考:readMessages|StatusMessage

Node.js
const { StreamManagerClient, ReadMessagesOptions, Status, StatusConfig, StatusLevel, StatusMessage, util, } = require('aws-greengrass-core-sdk').StreamManager; const client = new StreamManagerClient(); client.onConnected(async () => { try { let isS3UploadComplete = false; while (!isS3UploadComplete) { try { // Read the statuses from the export status stream const messages = await c.readMessages("StatusStreamName", new ReadMessagesOptions() .withMinMessageCount(1) .withReadTimeoutMillis(1000)); messages.forEach((message) => { // Deserialize the status message first. const statusMessage = util.deserializeJsonBytesToObj(message.payload, StatusMessage); // Check the status of the status message. If the status is 'Success', the file was successfully uploaded to S3. // If the status was either 'Failure' or 'Cancelled', the server was unable to upload the file to S3. // We will print the message for why the upload to S3 failed from the status message. // If the status was "InProgress", the status indicates that the server has started uploading the S3 task. if (statusMessage.status === Status.Success) { console.log(`Successfully uploaded file at path ${FILE_URL} to S3.`); isS3UploadComplete = true; } else if (statusMessage.status === Status.Failure || statusMessage.status === Status.Canceled) { console.log(`Unable to upload file at path ${FILE_URL} to S3. Message: ${statusMessage.message}`); isS3UploadComplete = true; } }); // Sleep for sometime for the S3 upload task to complete before trying to read the status message. await new Promise((r) => setTimeout(r, 5000)); } catch (e) { // Ignored } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 軟體開發套件參考:readMessages|StatusMessage