

支援終止通知：2026 年 10 月 7 日 AWS 將停止 的支援 AWS IoT Greengrass Version 1。2026 年 10 月 7 日之後，您將無法再存取 AWS IoT Greengrass V1 資源。如需詳細資訊，請造訪[從 遷移 AWS IoT Greengrass Version 1](https://docs.aws.amazon.com/greengrass/v2/developerguide/migrate-from-v1.html)。

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 管理 AWS IoT Greengrass 核心上的資料串流
<a name="stream-manager"></a>

AWS IoT Greengrass 串流管理員可讓您更輕鬆、更可靠地將大量 IoT 資料傳輸至 AWS 雲端。串流管理員會在本機處理資料串流，並 AWS 雲端 自動將其匯出至 。此功能會與常見的邊緣案例整合，例如機器學習 (ML) 推論，其中資料會在本機處理和分析，然後再匯出至 AWS 雲端 或本機儲存目的地。

串流管理員簡化了應用程式的開發。您的 IoT 應用程式可以使用標準化機制來處理大量串流並管理本機資料保留政策，而無須建立自訂串流管理功能。IoT 應用程式可以讀取和寫入串流。它們可以針對每個串流定義儲存類型、大小和資料保留政策，以控制串流管理員處理和匯出串流的方式。

串流管理員是為了在具有間歇性或連線能力有限的環境中工作而設計。您可以定義頻寬使用、逾時行為，以及當核心已連線或中斷連線時如何處理串流資料。對於關鍵資料，您可以設定優先順序，以控制串流匯出至 的順序 AWS 雲端。

您可以設定自動匯出至 AWS 雲端 以供儲存或進一步處理和分析。串流管理員支援匯出至下列 AWS 雲端 目的地。<a name="supported-export-destinations"></a>
+ 中的頻道 AWS IoT Analytics。 <a name="ita-export-destination"></a>AWS IoT Analytics 可讓您對資料執行進階分析，以協助做出業務決策並改善機器學習模型。如需詳細資訊，請參閱*AWS IoT Analytics 《 使用者指南*》中的[什麼是 AWS IoT Analytics？](https://docs.aws.amazon.com/iotanalytics/latest/userguide/welcome.html)。
+ Kinesis Data Streams 中的串流。<a name="aks-export-destination"></a>Kinesis Data Streams 通常用於彙總大量資料，並將其載入資料倉儲或地圖縮減叢集。如需詳細資訊，請參閱[《Amazon Kinesis 開發人員指南》中的什麼是 Amazon Kinesis Data Streams？](https://docs.aws.amazon.com/streams/latest/dev/what-is-this-service.html)。 *Amazon Kinesis *
+ 中的資產屬性 AWS IoT SiteWise。 <a name="itsw-export-destination"></a>AWS IoT SiteWise 可讓您大規模收集、組織和分析工業設備的資料。如需詳細資訊，請參閱*AWS IoT SiteWise 《 使用者指南*》中的[什麼是 AWS IoT SiteWise？](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/what-is-sitewise.html)。
+ Amazon S3 中的物件。<a name="s3-export-destination"></a>您可以使用 Amazon S3 來存放和擷取大量資料。如需詳細資訊，請參閱《[Amazon Simple Storage Service 開發人員指南》中的什麼是 Amazon S3？](https://docs.aws.amazon.com/AmazonS3/latest/dev/Welcome.html)。 **

## 串流管理工作流程
<a name="stream-manager-workflow"></a>

您的 IoT 應用程式會透過 AWS IoT Greengrass 核心 SDK 與串流管理員互動。在簡單的工作流程中，在 Greengrass 核心上執行的使用者定義 Lambda 函數會使用 IoT 資料，例如時間序列溫度和壓力指標。Lambda 函數可能會篩選或壓縮資料，然後呼叫 AWS IoT Greengrass 核心 SDK 將資料寫入串流管理員中的串流。串流管理員可以根據為串流定義的政策，自動將串流匯出至 AWS 雲端 。使用者定義的 Lambda 函數也可以將資料直接傳送到本機資料庫或儲存儲存庫。

您的 IoT 應用程式可以包含多個使用者定義的 Lambda 函數，這些函數可讀取或寫入串流。這些本機 Lambda 函數可以讀取和寫入串流，以在本機篩選、彙總和分析資料。如此便可在資料從核心傳輸到雲端或本機目的地之前，快速回應本機事件並擷取有價值的資訊。

工作流程範例如下圖所示。

![\[串流管理員工作流程圖。\]](http://docs.aws.amazon.com/zh_tw/greengrass/v1/developerguide/images/stream-manager-architecture.png)


若要使用串流管理員，請先設定串流管理員參數，以定義套用至 Greengrass 核心上所有串流的群組層級執行時間設定。這些可自訂的設定可讓您根據您的業務需求和環境限制，控制串流管理員如何存放、處理和匯出串流。如需詳細資訊，請參閱[設定 AWS IoT Greengrass 串流管理員](configure-stream-manager.md)。

設定串流管理員之後，您可以建立和部署 IoT 應用程式。這些通常是使用者定義的 Lambda 函數，在 AWS IoT Greengrass 核心 SDK `StreamManagerClient`中使用 來建立串流並與之互動。在建立串流期間，Lambda 函數會定義每個串流政策，例如匯出目的地、優先順序和持久性。如需詳細資訊，包括 `StreamManagerClient`操作的程式碼片段，請參閱 [使用 StreamManagerClient 搭配串流](work-with-streams.md)。

如需設定簡單工作流程的教學課程，請參閱 [將資料串流匯出至 AWS 雲端 （主控台）](stream-manager-console.md)或 [將資料串流匯出至 AWS 雲端 (CLI)](stream-manager-cli.md)。

## 要求
<a name="stream-manager-requirements"></a>

下列需求適用於使用串流管理員：
+ 您必須使用 AWS IoT Greengrass Core 軟體 v1.10 或更新版本，並啟用串流管理員。如需詳細資訊，請參閱[設定 AWS IoT Greengrass 串流管理員](configure-stream-manager.md)。

  <a name="stream-manager-not-supported-openwrt-para"></a>OpenWrt 發行版本不支援串流管理員。
+ Java 8 執行時間 (JDK 8) 必須安裝在核心上。<a name="install-java8-runtime-general"></a>
  + 針對以 Debian 為基礎的發行版本 (包括 Raspbian) 或以 Ubuntu 為基礎的發行版本，請執行下列命令：

    ```
    sudo apt install openjdk-8-jdk
    ```
  + 針對以 Red Hat 為基礎的發行版本 (包括 Amazon Linux)，請執行下列命令：

    ```
    sudo yum install java-1.8.0-openjdk
    ```

    如需詳細資訊，請參閱 OpenJDK 文件上的[如何下載和安裝預先建置的 OpenJDK 套件](https://openjdk.java.net/install/)。

   
+ 除了基本 AWS IoT Greengrass 核心軟體之外，串流管理員至少需要 70 MB RAM。您的總記憶體需求視您的工作負載而定。

   
+ 使用者定義的 Lambda 函數必須使用 [AWS IoT Greengrass 核心 SDK](lambda-functions.md#lambda-sdks-core) 與串流管理員互動。 AWS IoT Greengrass 核心 SDK 提供多種語言，但只有下列版本支援串流管理員操作：<a name="streammanagerclient-sdk-versions"></a>
  + Java SDK (v1.4.0 或更新版本）
  + Python SDK (1.5.0 版或更新版本）
  + Node.js SDK (1.6.0 版或更新版本）

  下載對應至 Lambda 函數執行時間的 SDK 版本，並將其包含在 Lambda 函數部署套件中。
**注意**  
適用於 Python 的 AWS IoT Greengrass 核心 SDK 需要 Python 3.7 或更新版本，並具有其他套件相依性。如需詳細資訊，請參閱[建立 Lambda 函數部署套件 （主控台）](stream-manager-console.md#stream-manager-console-create-deployment-package) 或[建立 Lambda 函數部署套件 (CLI)](stream-manager-cli.md#stream-manager-cli-create-deployment-package)。
+ 如果您定義串流的 AWS 雲端 匯出目的地，則必須建立匯出目標，並在 Greengrass 群組角色中授予存取許可。視目的地而定，可能也會套用其他要求。如需詳細資訊，請參閱：<a name="export-destinations-links"></a>
  + [AWS IoT Analytics 頻道](stream-export-configurations.md#export-to-iot-analytics)
  + [Amazon Kinesis 資料串流](stream-export-configurations.md#export-to-kinesis)
  + [AWS IoT SiteWise 資產屬性](stream-export-configurations.md#export-to-iot-sitewise)
  + [Amazon S3 物件](stream-export-configurations.md#export-to-s3)

  您負責維護這些 AWS 雲端 資源。

## 資料安全
<a name="stream-manager-security"></a>

當您使用串流管理員時，請注意下列安全性考量。

### 本機資料安全性
<a name="stream-manager-security-stream-data"></a>

AWS IoT Greengrass 不會在核心裝置上的元件之間加密靜態或本機傳輸的串流資料。
+ **靜態資料**。串流資料會在本機存放在 Greengrass 核心的儲存目錄中。為了資料安全，如果啟用， AWS IoT Greengrass 則依賴 Unix 檔案許可和全磁碟加密。您可以使用選用的 [STREAM\$1MANAGER\$1STORE\$1ROOT\$1DIR](configure-stream-manager.md#STREAM_MANAGER_STORE_ROOT_DIR) 參數來指定儲存目錄。如果您稍後將此參數變更為使用不同的儲存目錄， AWS IoT Greengrass 不會刪除先前的儲存目錄或其內容。

   
+ **本機傳輸中的資料**。 AWS IoT Greengrass 不會加密 核心上本機傳輸中的串流資料，包括資料來源、Lambda 函數、 AWS IoT Greengrass 核心 SDK 和串流管理員。

   
+ **傳輸到 的資料 AWS 雲端**。串流管理員匯出至 的資料串流 AWS 雲端 ，使用標準 AWS 服務用戶端加密搭配 Transport Layer Security (TLS)。

如需詳細資訊，請參閱[資料加密](data-encryption.md)。

### 用戶端身分驗證
<a name="stream-manager-security-client-authentication"></a>

串流管理員用戶端使用 AWS IoT Greengrass 核心 SDK 與串流管理員通訊。啟用用戶端身分驗證時，只有 Greengrass 群組中的 Lambda 函數可以與串流管理員中的串流互動。用戶端身分驗證停用時，Greengrass 核心上執行的任何處理程序 (例如 [Docker 容器](docker-app-connector.md)) 都可以與串流管理員中的串流互動。您應該只在商業案例需要時，才停用身分驗證。

您可以使用 [STREAM\$1MANAGER\$1AUTHENTICATE\$1CLIENT](configure-stream-manager.md#STREAM_MANAGER_AUTHENTICATE_CLIENT) 參數來設定用戶端身分驗證模式。您可以從 主控台或 AWS IoT Greengrass API 設定此參數。變更會在部署群組之後生效。


****  

|   | 已啟用 | 已停用 | 
| --- | --- | --- | 
| 參數值 | `true` (預設和建議) | `false` | 
| 允許的用戶端 | Greengrass 群組中的使用者定義 Lambda 函數 | Greengrass 群組中的使用者定義 Lambda 函數 在 Greengrass 核心裝置上執行的其他程序 | 

## 另請參閱
<a name="stream-manager-see-also"></a>
+ [設定 AWS IoT Greengrass 串流管理員](configure-stream-manager.md)
+ [使用 StreamManagerClient 搭配串流](work-with-streams.md)
+ [匯出支援 AWS 雲端 目的地的組態](stream-export-configurations.md)
+ [將資料串流匯出至 AWS 雲端 （主控台）](stream-manager-console.md)
+ [將資料串流匯出至 AWS 雲端 (CLI)](stream-manager-cli.md)

# 設定 AWS IoT Greengrass 串流管理員
<a name="configure-stream-manager"></a>

在 AWS IoT Greengrass 核心上，串流管理員可以存放、處理和匯出 IoT 裝置資料。串流管理員可提供您用來設定群組層級執行時間設定的參數。這些設定適用於 Greengrass 核心上的所有串流。您可以使用 AWS IoT 主控台或 AWS IoT Greengrass API 來設定串流管理員設定。變更會在部署群組之後生效。

**注意**  
設定串流管理員之後，您可以建立和部署在 Greengrass 核心上執行並與串流管理員互動的 IoT 應用程式。這些 IoT 應用程式通常是使用者定義的 Lambda 函數。如需詳細資訊，請參閱[使用 StreamManagerClient 搭配串流](work-with-streams.md)。

## 串流管理員參數
<a name="stream-manager-parameters"></a>

串流管理員提供下列參數，可讓您定義群組層級的設定。所有參數都是選用的。

**儲存目錄**  <a name="STREAM_MANAGER_STORE_ROOT_DIR"></a>
參數名稱：`STREAM_MANAGER_STORE_ROOT_DIR`  
用來儲存串流的本機目錄絕對路徑。此值必須以正斜線開頭 (例如 `/data`)。  
如需保護串流資料安全的資訊，請參閱 [本機資料安全性](stream-manager.md#stream-manager-security-stream-data)。  
最低 AWS IoT Greengrass 核心版本：1.10.0

**伺服器連接埠**  
參數名稱：`STREAM_MANAGER_SERVER_PORT`  
用於與串流管理員通訊的本機連接埠號碼。預設值為 `8088`。  
最低 AWS IoT Greengrass 核心版本：1.10.0

**驗證用戶端**  <a name="STREAM_MANAGER_AUTHENTICATE_CLIENT"></a>
參數名稱：`STREAM_MANAGER_AUTHENTICATE_CLIENT`  
表示用戶端是否須經過驗證才能與串流管理員互動。用戶端和串流管理員之間的所有互動都由 AWS IoT Greengrass 核心 SDK 控制。此參數會決定哪些用戶端可以呼叫 AWS IoT Greengrass 核心 SDK 來使用串流。如需詳細資訊，請參閱[用戶端身分驗證](stream-manager.md#stream-manager-security-client-authentication)。  
有效值為 `true` 或 `false`。預設值為 `true` (建議)。  
+ `true`。 僅允許 Greengrass Lambda 函數做為用戶端。Lambda 函數用戶端使用內部 AWS IoT Greengrass 核心通訊協定來驗證 AWS IoT Greengrass 核心 SDK。
+ `false`。 允許在 AWS IoT Greengrass 核心上執行的任何程序成為用戶端。除非業務案例需要，否則請勿設定為 `false`。例如，`false`只有在核心裝置上的非 Lambda 程序必須直接與串流管理員通訊時，才會將此值設定為 ，例如在核心上執行的 [Docker 容器](docker-app-connector.md)。
最低 AWS IoT Greengrass 核心版本：1.10.0

**最高頻寬**  
參數名稱：`STREAM_MANAGER_EXPORTER_MAX_BANDWIDTH`  
可用來匯出資料的平均最高頻寬 (以千位元數/秒為單位)。預設允許無限使用可用頻寬。  
最低 AWS IoT Greengrass 核心版本：1.10.0

**執行緒集區大小**  
參數名稱：`STREAM_MANAGER_EXPORTER_THREAD_POOL_SIZE`  
可用於匯出資料的作用中執行緒數量上限。預設值為 `5`。  
最佳大小取決於您的硬體、串流磁碟區和規劃的匯出串流數量。如果匯出速度很慢，您可以調整此設定，找出適合您硬體和商務案例的最佳大小。核心裝置硬體的 CPU 和記憶體是限制因素。首先，您可以嘗試將此值設定為等同於裝置上處理器核心的數量。  
請小心不要設定高於硬體可支援的大小。每個串流都會消耗硬體資源，因此您應該嘗試限制受限裝置上的匯出串流數目。  
最低 AWS IoT Greengrass 核心版本：1.10.0

**JVM 引數**  
參數名稱：`JVM_ARGS`  
自訂 Java 虛擬機器參數，以在啟動時傳遞給串流管理員。多個引數應用空格分隔。  
僅限必須覆寫 JVM 使用的預設設定時，才能使用此參數。例如，如果您計劃匯出大量串流，可能需要增加預設堆積大小。  
最低 AWS IoT Greengrass 核心版本：1.10.0

**唯讀輸入檔案目錄**  <a name="stream-manager-read-only-directories"></a>
參數名稱：`STREAM_MANAGER_READ_ONLY_DIRS`  
以逗號分隔的清單，列出存放輸入檔案之根檔案系統外部目錄的絕對路徑。串流管理員會讀取檔案並將其上傳至 Amazon S3，並將目錄掛載為唯讀。如需匯出至 Amazon S3 的詳細資訊，請參閱 [Amazon S3 物件](stream-export-configurations.md#export-to-s3)。  
只有在下列情況為 true 時，才使用此參數：  
+ 匯出至 Amazon S3 之串流的輸入檔案目錄位於下列其中一個位置：
  + 根檔案系統以外的分割區。
  + 在根檔案系統的 `/tmp`下。
+ Greengrass 群組[的預設容器化](lambda-group-config.md#lambda-containerization-groupsettings)為 **Greengrass 容器**。
範例值：`/mnt/directory-1,/mnt/directory-2,/tmp`  
最低 AWS IoT Greengrass 核心版本：1.11.0

**分段上傳的大小下限**  <a name="stream-manager-minimum-part-size"></a>
參數名稱：`STREAM_MANAGER_EXPORTER_S3_DESTINATION_MULTIPART_UPLOAD_MIN_PART_SIZE_BYTES`  
分段中組件的大小下限 （以位元組為單位） 上傳至 Amazon S3。串流管理員使用此設定和輸入檔案的大小，來決定如何在分段 PUT 請求中批次處理資料。預設和最小值為`5242880`位元組 (5 MB)。  
串流管理員使用串流的 `sizeThresholdForMultipartUploadBytes` 屬性來判斷要匯出至 Amazon S3 做為單一或分段上傳。使用者定義的 Lambda 函數會在建立匯出至 Amazon S3 的串流時設定此閾值。預設閾值為 5 MB。
最低 AWS IoT Greengrass 核心版本：1.11.0

## 設定串流管理員設定 (主控台)
<a name="configure-stream-manager-console"></a>

您可以使用 AWS IoT 主控台執行下列管理任務：
+ [檢查是否已啟用串流管理員](#check-stream-manager-console)
+ [在群組建立期間啟用或停用串流管理員](#enable-stream-manager-console-new-group)
+ [啟用或停用現有群組的串流管理員](#enable-stream-manager-console-existing-group)
+ [變更串流管理員設定](#change-stream-manager-console)

部署 Greengrass 群組之後，變更就會生效。如需說明如何部署 Greengrass 群組的教學課程，其中包含與串流管理員互動的 Lambda 函數，請參閱 [將資料串流匯出至 AWS 雲端 （主控台）](stream-manager-console.md)。

**注意**  <a name="ggstreammanager-function-config-console"></a>
當您使用主控台啟用串流管理員並部署群組時，串流管理員的記憶體大小預設為 4194304 KB (4 GB)。我們建議您將記憶體大小設定為至少 128000 KB。

 

### 檢查是否已啟用串流管理員 (主控台)
<a name="check-stream-manager-console"></a>

1. <a name="console-gg-groups"></a>在 AWS IoT 主控台導覽窗格的**管理**下，展開 **Greengrass 裝置**，然後選擇**群組 (V1)**。

1. <a name="group-choose-target-group"></a>選擇目標群組。

1. 選擇 **Lambda 函數索引標籤**。

1. 在 **System Lambda 函數**下，選取**串流管理員**，然後選擇**編輯**。

1. 檢查啟用或停用狀態。還會顯示已設定的任何自訂串流管理員設定。

 

### 在群組建立期間啟用或停用串流管理員 (主控台)
<a name="enable-stream-manager-console-new-group"></a>

1. <a name="console-gg-groups"></a>在 AWS IoT 主控台導覽窗格的**管理**下，展開 **Greengrass 裝置**，然後選擇**群組 (V1)**。

1. 選擇 **Create Group (建立群組)**。您在下一個頁面的選擇會決定設定群組串流管理員的方法。

1. 繼續進行**命名您的群組**，然後選擇 **Greengrass 核心**頁面。

1. 選擇**建立群組**。

1. 在群組組態頁面上，選擇 **Lambda 函數**索引標籤，選取**串流管理員**，然後選擇**編輯**。
   + 若要使用預設設定啟用串流管理員，請選擇**使用預設設定啟用**。

      
   + 若要以自訂設定啟用串流管理員，請選擇 **Customize settings (自訂設定)**。

     1. 在**設定串流管理員**頁面上，選擇**使用自訂設定啟用**。

     1. 在 **Custom settings (自訂設定)** 下，輸入串流管理員參數的值。如需詳細資訊，請參閱[串流管理員參數](#stream-manager-parameters)。將欄位保留空白 AWS IoT Greengrass ，以允許 使用其預設值。

         
   + 若要停用串流管理員，請選擇**停用**。

     1. 在 **Configure stream manager (設定串流管理員)** 頁面上，選擇 **Disable (停用)**。

         

1. 選擇**儲存**。

1. <a name="continue-create-group"></a>繼續透過剩餘的頁面建立群組。

1. 在**用戶端裝置**頁面上，下載您的安全資源，檢閱資訊，然後選擇**完成**。
**注意**  
啟用串流管理員時，您必須先在核心裝置上[安裝 Java 8 執行時間](stream-manager.md#stream-manager-requirements)，才能部署群組。

 

### 啟用或停用現有群組 (主控台) 的串流管理員
<a name="enable-stream-manager-console-existing-group"></a>

1. <a name="console-gg-groups"></a>在 AWS IoT 主控台導覽窗格的**管理**下，展開 **Greengrass 裝置**，然後選擇**群組 (V1)**。

1. <a name="group-choose-target-group"></a>選擇目標群組。

1. 選擇 **Lambda 函數索引標籤**。

1. 在 **System Lambda 函數**下，選取**串流管理員**，然後選擇**編輯**。

1. 檢查啟用或停用狀態。還會顯示已設定的任何自訂串流管理員設定。

 

### 變更串流管理員設定 (主控台)
<a name="change-stream-manager-console"></a>

1. <a name="console-gg-groups"></a>在 AWS IoT 主控台導覽窗格的**管理**下，展開 **Greengrass 裝置**，然後選擇**群組 (V1)**。

1. <a name="group-choose-target-group"></a>選擇目標群組。

1. 選擇 **Lambda 函數索引標籤**。

1. 在 **System Lambda 函數**下，選取**串流管理員**，然後選擇**編輯**。

1. 檢查啟用或停用狀態。還會顯示已設定的任何自訂串流管理員設定。

1. 選擇**儲存**。

## 設定串流管理員設定 (CLI)
<a name="configure-stream-manager-cli"></a>

在 中 AWS CLI，使用系統 `GGStreamManager` Lambda 函數來設定串流管理員。System Lambda 函數是 AWS IoT Greengrass Core 軟體的元件。對於串流管理員和其他一些系統 Lambda 函數，您可以透過管理 Greengrass 群組中對應的 `Function`和 `FunctionDefinitionVersion` 物件來設定 Greengrass 功能。如需詳細資訊，請參閱[AWS IoT Greengrass 群組物件模型概觀](deployments.md#api-overview)。

您可以使用 API 執行下列管理任務。本節中的範例示範如何使用 AWS CLI，但您也可以直接呼叫 AWS IoT Greengrass API AWS 或使用 SDK。
+ [檢查是否已啟用串流管理員](#check-stream-manager-cli)
+ [啟用、停用或設定串流管理員](#enable-stream-manager-cli)

變更會在部署群組之後生效。如需說明如何使用與串流管理員互動的 Lambda 函數部署 Greengrass 群組的教學課程，請參閱 [將資料串流匯出至 AWS 雲端 (CLI)](stream-manager-cli.md)。

**提示**  
若要查看串流管理員是否已啟用並從核心裝置執行，您可以在裝置上的終端機中執行下列命令。  

```
ps aux | grep -i 'streammanager'
```

 

### 檢查是否已啟用串流管理員 (CLI)
<a name="check-stream-manager-cli"></a>

如果您部署的函數定義版本包含系統 `GGStreamManager` Lambda 函數，則會啟用串流管理員。若要檢查，請執行下列動作：

1. <a name="get-group-id-latestversion"></a>取得目標 Greengrass 群組 ID 和目標群組版本 ID。此程序假設這是最新的群組和群組版本。下列查詢會傳回最近建立的群組。

   ```
   aws greengrass list-groups --query "reverse(sort_by(Groups, &CreationTimestamp))[0]"
   ```

   或者，您可以依名稱查詢。群組名稱不需要是唯一名稱，因此可能會傳回多個群組。

   ```
   aws greengrass list-groups --query "Groups[?Name=='MyGroup']"
   ```
**注意**  
<a name="find-group-ids-console"></a>您也可以在 AWS IoT 主控台中找到這些值。群組 ID 會顯示在群組的 **Settings (設定)** 頁面上。群組版本 IDs會顯示在群組的**部署**索引標籤上。

1. <a name="copy-group-id-latestversion"></a>從輸出中的目標群組複製 `Id` 和 `LatestVersion` 值。

1. <a name="get-latest-group-version"></a>取得最新的群組版本。
   + 將 *group-id* 取代為您複製`Id`的 。
   + 將 *latest-group-version-id* 取代為您複製`LatestVersion`的 。

   ```
   aws greengrass get-group-version \
   --group-id group-id \
   --group-version-id latest-group-version-id
   ```

1. 從輸出中的 `FunctionDefinitionVersionArn`，取得函數定義的 ID 和函數定義版本。
   + 函數定義 ID 是遵循 Amazon Resource Name (ARN) 中客`functions`群的 GUID。
   + 函數定義版本 ID 是遵循 ARN 中的 `versions` 區段的 GUID。

   ```
   arn:aws:greengrass:us-west-2:123456789012:/greengrass/definition/functions/function-definition-id/versions/function-definition-version-id
   ```

1. 取得函數定義版本。
   + 用函數定義 ID 替換 *function-definition-id*。
   + 用函數定義版本 ID 替換 *function-definition-version-id*。

   ```
   aws greengrass get-function-definition-version \
   --function-definition-id function-definition-id \
   --function-definition-version-id function-definition-version-id
   ```

如果輸出中的 `functions` 陣列包括 `GGStreamManager` 函數，則會啟用串流管理員。為函數定義的任何環境變數代表串流管理員的自訂設定。

### 若要啟用、停用或設定串流管理員 (CLI)
<a name="enable-stream-manager-cli"></a>

在 中 AWS CLI，使用系統 `GGStreamManager` Lambda 函數來設定串流管理員。變更會在部署群組之後生效。
+ 若要啟用串流管理員，請加入函數定義版本的 `functions` 陣列中的 `GGStreamManager`。若要設定自訂設定，請為對應的[串流管理員參數](#stream-manager-parameters)定義環境變數。
+ 若要停用串流管理員，請從函數定義版本的 `functions` 陣列中移除 `GGStreamManager`。

**採用預設設定的串流管理員**  
下列範例設定會以預設設定啟用串流管理員。它將任意函數 ID 設為 `streamManager`。  

```
{
    "FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1",
    "FunctionConfiguration": {
        "MemorySize": 4194304,
        "Pinned": true,
        "Timeout": 3
    },
    "Id": "streamManager"
}
```
對於 `FunctionConfiguration` 屬性，您可能知道下列事項：  
+ `MemorySize` 設定為具有預設設定的 4194304 KB (4 GB)。您可以隨時變更此值。我們建議您將 `MemorySize`設定為至少 128000 KB。
+ `Pinned` 必須設定為 `true`。
+ 函數定義版本需要 `Timeout`，但 `GGStreamManager` 不會使用它。

**採用自訂設定的串流管理員**  <a name="enable-stream-manager-custom-settings"></a>
下列範例組態可讓串流管理員具有儲存目錄、伺服器連接埠和執行緒集區大小參數的自訂值。  

```
{
    "FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1",
    "FunctionConfiguration": {
        "Environment": {
            "Variables": {
                "STREAM_MANAGER_STORE_ROOT_DIR": "/data",
                "STREAM_MANAGER_SERVER_PORT": "1234",
                "STREAM_MANAGER_EXPORTER_THREAD_POOL_SIZE": "4"
            }
        },
        "MemorySize": 4194304,
        "Pinned": true,
        "Timeout": 3
    },
    "Id": "streamManager"
}
```
AWS IoT Greengrass 對於未指定為環境變數的[串流管理員參數](#stream-manager-parameters)， 會使用預設值。

**具有 Amazon S3 匯出自訂設定的串流管理員**  <a name="enable-stream-manager-custom-settings-s3"></a>
下列範例組態可讓串流管理員具有上傳目錄的自訂值，以及最小分段上傳大小參數。  

```
{
    "FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1",
    "FunctionConfiguration": {
        "Environment": {
            "Variables": {
                "STREAM_MANAGER_READ_ONLY_DIRS": "/mnt/directory-1,/mnt/directory-2,/tmp",
                "STREAM_MANAGER_EXPORTER_S3_DESTINATION_MULTIPART_UPLOAD_MIN_PART_SIZE_BYTES": "10485760"
            }
        },
        "MemorySize": 4194304,
        "Pinned": true,
        "Timeout": 3
    },
    "Id": "streamManager"
}
```

 

**若要啟用、停用或設定串流管理員 (CLI)**

1. <a name="get-group-id-latestversion"></a>取得目標 Greengrass 群組 ID 和目標群組版本 ID。此程序假設這是最新的群組和群組版本。下列查詢會傳回最近建立的群組。

   ```
   aws greengrass list-groups --query "reverse(sort_by(Groups, &CreationTimestamp))[0]"
   ```

   或者，您可以依名稱查詢。群組名稱不需要是唯一名稱，因此可能會傳回多個群組。

   ```
   aws greengrass list-groups --query "Groups[?Name=='MyGroup']"
   ```
**注意**  
<a name="find-group-ids-console"></a>您也可以在 AWS IoT 主控台中找到這些值。群組 ID 會顯示在群組的 **Settings (設定)** 頁面上。群組版本 IDs會顯示在群組的**部署**索引標籤上。

1. <a name="copy-group-id-latestversion"></a>從輸出中的目標群組複製 `Id` 和 `LatestVersion` 值。

1. <a name="get-latest-group-version"></a>取得最新的群組版本。
   + 將 *group-id* 取代為您複製`Id`的 。
   + 將 *latest-group-version-id* 取代為您複製`LatestVersion`的 。

   ```
   aws greengrass get-group-version \
   --group-id group-id \
   --group-version-id latest-group-version-id
   ```

1. 從輸出複製 `CoreDefinitionVersionArn`和所有其他版本 ARNs，但 除外`FunctionDefinitionVersionArn`。您稍後會在建立群組版本時使用這些值。

1. <a name="parse-function-def-id"></a>從輸出中的 `FunctionDefinitionVersionArn`，複製函數定義的 ID。該 ID 是 ARN 中跟在 `functions` 區段後面的 GUID，如下列範例所示。

   ```
   arn:aws:greengrass:us-west-2:123456789012:/greengrass/definition/functions/bcfc6b49-beb0-4396-b703-6dEXAMPLEcu5/versions/0f7337b4-922b-45c5-856f-1aEXAMPLEsf6
   ```
**注意**  
或者，您可以執行 [https://docs.aws.amazon.com/cli/latest/reference/greengrass/create-function-definition.html](https://docs.aws.amazon.com/cli/latest/reference/greengrass/create-function-definition.html)命令，然後從輸出複製 ID 來建立函數定義。

1. <a name="enable-stream-manager-function-definition-version"></a>新增函數定義版本至函數定義。
   + 將 *function-definition-id* 取代為您為函數定義複製的 `Id`。
   + 在`functions`陣列中，包含您要在 Greengrass 核心上使用的所有其他函數。您可以使用 `get-function-definition-version` 命令來取得現有函數的清單。

      
**以預設設定啟用串流管理員**  
下列範例透過在`functions`陣列中包含 `GGStreamManager`函數來啟用串流管理員。此範例使用[串流管理員參數](#stream-manager-parameters)的預設值。  

   ```
   aws greengrass create-function-definition-version \
   --function-definition-id function-definition-id \
   --functions '[
           {
               "FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1",
               "FunctionConfiguration": {
                   "MemorySize":  4194304,
                   "Pinned": true,
                   "Timeout": 3
               },
               "Id": "streamManager"
           },
           {    
               "FunctionArn": "arn:aws:lambda:us-west-2:123456789012:function:MyLambdaFunction:MyAlias",
               "FunctionConfiguration": {
                   "Executable": "myLambdaFunction.function_handler",
                   "MemorySize": 16000,
                   "Pinned": true,
                   "Timeout": 5
               },
               "Id": "myLambdaFunction"
           },
           ... more user-defined functions
       ]
   }'
   ```
範例中的 `myLambdaFunction`函數代表其中一個使用者定義的 Lambda 函數。  
**用自訂設定啟用串流管理員**  
下列範例會藉由在 `functions` 陣列中包括 `GGStreamManager` 函數來啟用串流管理員。除非您想要變更預設值，否則所有串流管理員設定都是選用的。此範例說明如何使用環境變數來設定自訂值。  

   ```
   aws greengrass create-function-definition-version \
   --function-definition-id function-definition-id \
   --functions '[
           {
               "FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1",
               "FunctionConfiguration": {
                   "Environment": {
                       "Variables": {
                           "STREAM_MANAGER_STORE_ROOT_DIR": "/data",
                           "STREAM_MANAGER_SERVER_PORT": "1234",
                           "STREAM_MANAGER_EXPORTER_THREAD_POOL_SIZE": "4"
                       }
                   },
                   "MemorySize":  4194304,
                   "Pinned": true,
                   "Timeout": 3
               },
               "Id": "streamManager"
           },
           {    
               "FunctionArn": "arn:aws:lambda:us-west-2:123456789012:function:MyLambdaFunction:MyAlias",
               "FunctionConfiguration": {
                   "Executable": "myLambdaFunction.function_handler",
                   "MemorySize": 16000,
                   "Pinned": true,
                   "Timeout": 5
               },
               "Id": "myLambdaFunction"
           },
           ... more user-defined functions
       ]
   }'
   ```
對於 `FunctionConfiguration` 屬性，您可能知道下列事項：  
   + `MemorySize` 設定為具有預設設定的 4194304 KB (4 GB)。您可以隨時變更此值。我們建議您將 `MemorySize`設定為至少 128000 KB。
   + `Pinned` 必須設定為 `true`。
   + 函數定義版本需要 `Timeout`，但 `GGStreamManager` 不會使用它。  
**停用串流管理員**  
下列範例會省略停用串流管理員的 `GGStreamManager` 函數。  

   ```
   aws greengrass create-function-definition-version \
   --function-definition-id function-definition-id \
   --functions '[
           {       
               "FunctionArn": "arn:aws:lambda:us-west-2:123456789012:function:MyLambdaFunction:MyAlias",
               "FunctionConfiguration": {
                   "Executable": "myLambdaFunction.function_handler",
                   "MemorySize": 16000,
                   "Pinned": true,
                   "Timeout": 5
               },
               "Id": "myLambdaFunction"
           },
           ... more user-defined functions
       ]
   }'
   ```
如果您不想部署任何 Lambda 函數，您可以完全省略函數定義版本。

1. <a name="copy-function-def-version-arn"></a>從輸出複製函數定義版本的 `Arn`。

1. <a name="create-group-version-with-sys-lambda"></a>建立包含系統 Lambda 函數的群組版本。
   + 為該群組將 *group-id* 取代為 `Id`。
   + 將 *core-definition-version-arn* 取代為您從最新群組版本所複製的 `CoreDefinitionVersionArn`。
   + 將 *function-definition-version-arn* 取代為您為功能定義版本所複製的 `Arn`。
   + 取代您已從最新群組版本複製之其他群組元件 (例如 `SubscriptionDefinitionVersionArn` 或 `DeviceDefinitionVersionArn`) 的 ARN。
   + 移除任何未使用的參數。比如說若您的群組版本不包含任何資源，則請移除 `--resource-definition-version-arn`。

   ```
   aws greengrass create-group-version \
   --group-id group-id \
   --core-definition-version-arn core-definition-version-arn \
   --function-definition-version-arn function-definition-version-arn \
   --device-definition-version-arn device-definition-version-arn \
   --logger-definition-version-arn logger-definition-version-arn \
   --resource-definition-version-arn resource-definition-version-arn \
   --subscription-definition-version-arn subscription-definition-version-arn
   ```

1. <a name="copy-group-version-id"></a>從輸出複製 `Version`。這是新群組版本的 ID。

1. <a name="create-group-deployment"></a>部署具有新群組版本的群組。
   + 將 *group-id* 取代為您為群組所複製的 `Id`。
   + 將 *group-version-id* 取代為您為新群組版本所複製的 `Version`。

   ```
   aws greengrass create-deployment \
   --group-id group-id \
   --group-version-id group-version-id \
   --deployment-type NewDeployment
   ```

 

如果您想要稍後再次編輯串流管理員設定，請遵循此程序。請務必建立函數定義版本，其中包含具有更新組態的`GGStreamManager`函數。群組版本必須參考您要部署到核心的所有元件版本 ARNs。變更會在部署群組之後生效。

## 另請參閱
<a name="configure-stream-manager-see-also"></a>
+ [管理 AWS IoT Greengrass 核心上的資料串流](stream-manager.md)
+ [使用 StreamManagerClient 搭配串流](work-with-streams.md)
+ [匯出支援 AWS 雲端 目的地的組態](stream-export-configurations.md)
+ [將資料串流匯出至 AWS 雲端 （主控台）](stream-manager-console.md)
+ [將資料串流匯出至 AWS 雲端 (CLI)](stream-manager-cli.md)

# 使用 StreamManagerClient 搭配串流
<a name="work-with-streams"></a>

在 AWS IoT Greengrass 核心上執行的使用者定義 Lambda 函數可以使用[AWS IoT Greengrass 核心 SDK](lambda-functions.md#lambda-sdks) 中的 `StreamManagerClient` 物件，在串流[管理員](stream-manager.md)中建立串流，然後與串流互動。當 Lambda 函數建立串流時，它會定義串流的 AWS 雲端 目的地、優先順序和其他匯出和資料保留政策。若要將資料傳送至串流管理員，Lambda 函數會將資料附加至串流。如果為串流定義匯出目的地，串流管理員會自動匯出串流。

**注意**  
<a name="stream-manager-clients"></a>一般而言，串流管理員的用戶端是使用者定義的 Lambda 函數。如果您的商業案例需要，您也可以允許在 Greengrass 核心 （例如 Docker 容器） 上執行的非 Lambda 程序與串流管理員互動。如需詳細資訊，請參閱[用戶端身分驗證](stream-manager.md#stream-manager-security-client-authentication)。

本主題中的程式碼片段會示範用戶端如何呼叫`StreamManagerClient`方法以使用串流。如需方法及其引數的實作詳細資訊，請使用每個程式碼片段後列出的 SDK 參考連結。如需包含完整 Python Lambda 函數的教學課程，請參閱 [將資料串流匯出至 AWS 雲端 （主控台）](stream-manager-console.md)或 [將資料串流匯出至 AWS 雲端 (CLI)](stream-manager-cli.md)。

您的 Lambda 函數應該在函數處理常式`StreamManagerClient`之外執行個體化。如果在處理常式內執行個體化，則該函數在每次叫用時都會建立 `client` 以及與串流管理員的連線。

**注意**  
如果您在處理常式內將 `StreamManagerClient` 執行個體化，則必須在 `client` 完成其工作時明確呼叫 `close()` 方法。否則，`client` 會將連線保持在開啟狀態，並將另一個執行緒保持在執行狀態，直到指令碼結束為止。

`StreamManagerClient` 支援下列操作：
+ [建立訊息串流](#streammanagerclient-create-message-stream)
+ [附加訊息](#streammanagerclient-append-message)
+ [讀取訊息](#streammanagerclient-read-messages)
+ [列出串流](#streammanagerclient-list-streams)
+ [描述訊息串流](#streammanagerclient-describe-message-stream)
+ [更新訊息串流](#streammanagerclient-update-message-stream)
+ [刪除訊息串流](#streammanagerclient-delete-message-stream)

## 建立訊息串流
<a name="streammanagerclient-create-message-stream"></a>

若要建立串流，使用者定義的 Lambda 函數會呼叫建立方法並傳入`MessageStreamDefinition`物件。此物件會指定串流的唯一名稱，並定義串流管理員在達到串流大小上限時應如何處理新資料。您可以使用 `MessageStreamDefinition` 及其資料類型 (例如 `ExportDefinition`、`StrategyOnFull` 和 `Persistence`) 來定義其他串流屬性。其中包含：
+ 自動匯出的目標 AWS IoT SiteWise、 AWS IoT Analytics Kinesis Data Streams 和 Amazon S3 目的地。如需詳細資訊，請參閱[匯出支援 AWS 雲端 目的地的組態](stream-export-configurations.md)。
+ 匯出優先順序。串流管理員會先匯出優先順序較高的串流，然後再匯出較低的串流。
+  AWS IoT Analytics、Kinesis Data Streams 和目的地的批次大小上限和 AWS IoT SiteWise 批次間隔。符合任一條件時，串流管理員會匯出訊息。
+ 存留時間 (TTL)。保證串流資料可供處理的時間量。您應該確定資料可以在這段期間內使用。這不是刪除政策。TTL 期間後，資料可能不會立即刪除。
+ 串流持久性。選擇此選項可將串流儲存至檔案系統，以便在核心重新啟動期間保留資料，或將串流儲存在記憶體中。
+ 起始序號。指定要用作匯出中起始訊息的訊息序號。

如需 的詳細資訊`MessageStreamDefinition`，請參閱目標語言的 SDK 參考：
+ Java 開發套件中的 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html) 
+ Node.js SDK 中的 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html) 
+ Python SDK 中的 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.MessageStreamDefinition) 

**注意**  
<a name="streammanagerclient-http-config"></a>`StreamManagerClient` 也提供目標目的地，可用來將串流匯出至 HTTP 伺服器。此目標僅供測試之用。它不穩定或不支援在生產環境中使用。

建立串流後，您的 Lambda 函數可以將[訊息附加](#streammanagerclient-append-message)至串流，以傳送資料進行匯出，並從串流[讀取訊息](#streammanagerclient-append-message)以進行本機處理。您建立的串流數量取決於您的硬體功能和商業案例。其中一個策略是為 AWS IoT Analytics 或 Kinesis 資料串流中的每個目標頻道建立串流，但您可以定義串流的多個目標。串流的生命週期相當耐久。

### 要求
<a name="streammanagerclient-create-message-stream-reqs"></a>

此操作有下列需求：
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心版本：1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.5.0  \$1  Java：1.4.0  \$1  Node.js：1.6.0

**注意**  
使用 AWS IoT SiteWise 或 Amazon S3 匯出目的地建立串流有下列需求：  
<a name="streammanagerclient-min-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心版本：1.11.0
<a name="streammanagerclient-min-sdk-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.6.0  \$1  Java：1.5.0  \$1  Node.js：1.7.0

### 範例
<a name="streammanagerclient-create-message-stream-examples"></a>

以下程式碼片段會建立名為 `StreamName` 的串流。它定義 `MessageStreamDefinition`和次級資料類型中的串流屬性。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    client.create_message_stream(MessageStreamDefinition(
        name="StreamName",  # Required.
        max_size=268435456,  # Default is 256 MB.
        stream_segment_size=16777216,  # Default is 16 MB.
        time_to_live_millis=None,  # By default, no TTL is enabled.
        strategy_on_full=StrategyOnFull.OverwriteOldestData,  # Required.
        persistence=Persistence.File,  # Default is File.
        flush_on_write=False,  # Default is false.
        export_definition=ExportDefinition(  # Optional. Choose where/how the stream is exported to the AWS 雲端.
            kinesis=None,
            iot_analytics=None,
            iot_sitewise=None,
            s3_task_executor=None
        )
    ))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[create\$1message\$1stream](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.create_message_stream) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.MessageStreamDefinition)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    client.createMessageStream(
            new MessageStreamDefinition()
                    .withName("StreamName") // Required.
                    .withMaxSize(268435456L)  // Default is 256 MB.
                    .withStreamSegmentSize(16777216L)  // Default is 16 MB.
                    .withTimeToLiveMillis(null)  // By default, no TTL is enabled.
                    .withStrategyOnFull(StrategyOnFull.OverwriteOldestData)  // Required.
                    .withPersistence(Persistence.File)  // Default is File.
                    .withFlushOnWrite(false)  // Default is false.
                    .withExportDefinition(  // Optional. Choose where/how the stream is exported to the AWS 雲端.
                            new ExportDefinition()
                                    .withKinesis(null)
                                    .withIotAnalytics(null)
                                    .withIotSitewise(null)
                                    .withS3TaskExecutor(null)
                    )
 
    );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[createMessageStream](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#createMessageStream-com.amazonaws.greengrass.streammanager.model.MessageStreamDefinition-) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        await client.createMessageStream(
            new MessageStreamDefinition()
                .withName("StreamName") // Required.
                .withMaxSize(268435456)  // Default is 256 MB.
                .withStreamSegmentSize(16777216)  // Default is 16 MB.
                .withTimeToLiveMillis(null)  // By default, no TTL is enabled.
                .withStrategyOnFull(StrategyOnFull.OverwriteOldestData)  // Required.
                .withPersistence(Persistence.File)  // Default is File.
                .withFlushOnWrite(false)  // Default is false.
                .withExportDefinition(  // Optional. Choose where/how the stream is exported to the AWS 雲端.
                    new ExportDefinition()
                        .withKinesis(null)
                        .withIotAnalytics(null)
                        .withIotSitewise(null)
                        .withS3TaskExecutor(null)
                )
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[createMessageStream](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#createMessageStream) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html)

------

如需設定匯出目的地的詳細資訊，請參閱 [匯出支援 AWS 雲端 目的地的組態](stream-export-configurations.md)。

 

## 附加訊息
<a name="streammanagerclient-append-message"></a>

若要將資料傳送至串流管理員以進行匯出，您的 Lambda 函數會將資料附加至目標串流。匯出目的地會決定要傳遞至此方法的資料類型。

### 要求
<a name="streammanagerclient-append-message-reqs"></a>

此操作有下列需求：
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心版本：1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.5.0  \$1  Java：1.4.0  \$1  Node.js：1.6.0

**注意**  
使用 AWS IoT SiteWise 或 Amazon S3 匯出目的地附加訊息有下列需求：  
<a name="streammanagerclient-min-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心版本：1.11.0
<a name="streammanagerclient-min-sdk-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.6.0  \$1  Java：1.5.0  \$1  Node.js：1.7.0

### 範例
<a name="streammanagerclient-append-message-examples"></a>

#### AWS IoT Analytics 或 Kinesis Data Streams 匯出目的地
<a name="streammanagerclient-append-message-blob"></a>

下列程式碼片段附加一個訊息到名為 `StreamName` 的串流。對於 AWS IoT Analytics 或 Kinesis Data Streams 目的地，您的 Lambda 函數會附加資料的 Blob。

此程式碼片段有下列需求：
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心版本：1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.5.0  \$1  Java：1.4.0  \$1  Node.js：1.6.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data')
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[end\$1message](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.append_message)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes());
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[appendMessage](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array"));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[appendMessage](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage)

------

#### AWS IoT SiteWise 匯出目的地
<a name="streammanagerclient-append-message-sitewise"></a>

下列程式碼片段附加一個訊息到名為 `StreamName` 的串流。對於 AWS IoT SiteWise 目的地，您的 Lambda 函數會附加序列化`PutAssetPropertyValueEntry`物件。如需詳細資訊，請參閱[匯出至 AWS IoT SiteWise](stream-export-configurations.md#export-streams-to-sitewise)。

**注意**  
<a name="BatchPutAssetPropertyValue-data-reqs"></a>當您傳送資料至 時 AWS IoT SiteWise，您的資料必須符合 `BatchPutAssetPropertyValue`動作的要求。如需詳細資訊，請參閱《AWS IoT SiteWise API 參考》**中的 [BatchPutAssetPropertyValue](https://docs.aws.amazon.com/iot-sitewise/latest/APIReference/API_BatchPutAssetPropertyValue.html)。

此程式碼片段有下列需求：
+ <a name="streammanagerclient-min-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心版本：1.11.0
+ <a name="streammanagerclient-min-sdk-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.6.0  \$1  Java：1.5.0  \$1  Node.js：1.7.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    # SiteWise requires unique timestamps in all messages. Add some randomness to time and offset.

    # Note: To create a new asset property data, you should use the classes defined in the
    # greengrasssdk.stream_manager module.

    time_in_nanos = TimeInNanos(
        time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
    )
    variant = Variant(double_value=random.random())
    asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
    putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset)
    sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[end\$1message](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.append_message) \$1 [PutAssetPropertyValueEntry](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.PutAssetPropertyValueEntry)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    Random rand = new Random();
    // Note: To create a new asset property data, you should use the classes defined in the
    // com.amazonaws.greengrass.streammanager.model.sitewise package.
    List<AssetPropertyValue> entries = new ArrayList<>() ;

    // IoTSiteWise requires unique timestamps in all messages. Add some randomness to time and offset.
    final int maxTimeRandomness = 60;
    final int maxOffsetRandomness = 10000;
    double randomValue = rand.nextDouble();
    TimeInNanos timestamp = new TimeInNanos()
            .withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness))
            .withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness)));
    AssetPropertyValue entry = new AssetPropertyValue()
            .withValue(new Variant().withDoubleValue(randomValue))
            .withQuality(Quality.GOOD)
            .withTimestamp(timestamp);
    entries.add(entry);

    PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
            .withEntryId(UUID.randomUUID().toString())
            .withPropertyAlias("PropertyAlias")
            .withPropertyValues(entries);
    long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[appendMessage](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-) \$1 [PutAssetPropertyValueEntry](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/sitewise/PutAssetPropertyValueEntry.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const maxTimeRandomness = 60;
        const maxOffsetRandomness = 10000;
        const randomValue = Math.random();
        // Note: To create a new asset property data, you should use the classes defined in the
        // aws-greengrass-core-sdk StreamManager module.
        const timestamp = new TimeInNanos()
            .withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness))
            .withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness));
        const entry = new AssetPropertyValue()
            .withValue(new Variant().withDoubleValue(randomValue))
            .withQuality(Quality.GOOD)
            .withTimestamp(timestamp);

        const putAssetPropertyValueEntry =  new PutAssetPropertyValueEntry()
            .withEntryId(`${ENTRY_ID_PREFIX}${i}`)
            .withPropertyAlias("PropertyAlias")
            .withPropertyValues([entry]);
        const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[appendMessage](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage) \$1 [PutAssetPropertyValueEntry](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.PutAssetPropertyValueEntry.html)

------

#### Amazon S3 匯出目的地
<a name="streammanagerclient-append-message-export-task"></a>

下列程式碼片段會將匯出任務附加至名為 的串流`StreamName`。對於 Amazon S3 目的地，您的 Lambda 函數會附加序列化`S3ExportTaskDefinition`物件，其中包含來源輸入檔案和目標 Amazon S3 物件的相關資訊。如果指定的物件不存在，串流管理員會為您建立它。如需詳細資訊，請參閱[匯出至 Amazon S3](stream-export-configurations.md#export-streams-to-s3)。

此程式碼片段有下列需求：
+ <a name="streammanagerclient-min-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心版本：1.11.0
+ <a name="streammanagerclient-min-sdk-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.6.0  \$1  Java：1.5.0  \$1  Node.js：1.7.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    # Append an Amazon S3 Task definition and print the sequence number.
    s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName")
    sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[end\$1message](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.append_message) \$1 [S3ExportTaskDefinition](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.S3ExportTaskDefinition)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    // Append an Amazon S3 export task definition and print the sequence number.
    S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition()
        .withBucket("BucketName")
        .withKey("KeyName")
        .withInputUrl("URLToFile");
    long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition));
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[appendMessage](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-) \$1 [S3ExportTaskDefinition](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/S3ExportTaskDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
     // Append an Amazon S3 export task definition and print the sequence number.
     const taskDefinition = new S3ExportTaskDefinition()
        .withBucket("BucketName")
        .withKey("KeyName")
        .withInputUrl("URLToFile");
        const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition)));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[appendMessage](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage) \$1 [S3ExportTaskDefinition](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.S3ExportTaskDefinition.html)

------

 

## 讀取訊息
<a name="streammanagerclient-read-messages"></a>

從串流讀取訊息。

### 要求
<a name="streammanagerclient-read-messages-reqs"></a>

此操作有下列需求：
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心版本：1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.5.0  \$1  Java：1.4.0  \$1  Node.js：1.6.0

### 範例
<a name="streammanagerclient-read-messages-examples"></a>

下列程式碼片段可從名為 `StreamName` 的串流讀取訊息。讀取方法需要一個選用的 `ReadMessagesOptions` 物件以指定序號，從要讀取的最小、最大數字和讀取訊息的逾時開始讀取。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    message_list = client.read_messages(
        stream_name="StreamName",
        # By default, if no options are specified, it tries to read one message from the beginning of the stream.
        options=ReadMessagesOptions(
            desired_start_sequence_number=100,
            # Try to read from sequence number 100 or greater. By default, this is 0.
            min_message_count=10,
            # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
            max_message_count=100,  # Accept up to 100 messages. By default this is 1.
            read_timeout_millis=5000
            # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
        )
    )
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[Read\$1messages](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.read_messages) \$1 [ReadMessagesOptions](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.ReadMessagesOptions)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    List<Message> messages = client.readMessages("StreamName",
            // By default, if no options are specified, it tries to read one message from the beginning of the stream.
            new ReadMessagesOptions()
                    // Try to read from sequence number 100 or greater. By default this is 0.
                    .withDesiredStartSequenceNumber(100L)
                    // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
                    .withMinMessageCount(10L)
                    // Accept up to 100 messages. By default this is 1.
                    .withMaxMessageCount(100L)
                    // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
                    .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis())
    );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[readMessages](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#readMessages-java.lang.String-com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions-) \$1 [ReadMessagesOptions](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/ReadMessagesOptions.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const messages = await client.readMessages("StreamName",
            // By default, if no options are specified, it tries to read one message from the beginning of the stream.
            new ReadMessagesOptions()
                // Try to read from sequence number 100 or greater. By default this is 0.
                .withDesiredStartSequenceNumber(100)
                // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1.
                .withMinMessageCount(10)
                // Accept up to 100 messages. By default this is 1.
                .withMaxMessageCount(100)
                // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
                .withReadTimeoutMillis(5 * 1000)
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[readMessages](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#readMessages) \$1 [ReadMessagesOptions](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.ReadMessagesOptions.html)

------

 

## 列出串流
<a name="streammanagerclient-list-streams"></a>

取得串流管理員中的串流清單。

### 要求
<a name="streammanagerclient-list-streams-reqs"></a>

此操作有下列需求：
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心版本：1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.5.0  \$1  Java：1.4.0  \$1  Node.js：1.6.0

### 範例
<a name="streammanagerclient-list-streams-examples"></a>

下列程式碼片段可獲取串流管理員中的串流清單 (依據名稱)。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    stream_names = client.list_streams()
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[list\$1streams](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.list_streams)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    List<String> streamNames = client.listStreams();
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[listStreams](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#listStreams--)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const streams = await client.listStreams();
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[listStreams](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#listStreams)

------

 

## 描述訊息串流
<a name="streammanagerclient-describe-message-stream"></a>

取得串流的中繼資料，包括串流定義、大小和匯出狀態。

### 要求
<a name="streammanagerclient-describe-message-stream-reqs"></a>

此操作有下列需求：
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心版本：1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.5.0  \$1  Java：1.4.0  \$1  Node.js：1.6.0

### 範例
<a name="streammanagerclient-describe-message-stream-examples"></a>

下列程式碼片段可獲取名為 `StreamName` 的串流相關的中繼資料，包括串流定義、大小和匯出工具狀態。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    stream_description = client.describe_message_stream(stream_name="StreamName")
    if stream_description.export_statuses[0].error_message:
        # The last export of export destination 0 failed with some error
        # Here is the last sequence number that was successfully exported
        stream_description.export_statuses[0].last_exported_sequence_number
 
    if (stream_description.storage_status.newest_sequence_number >
            stream_description.export_statuses[0].last_exported_sequence_number):
        pass
        # The end of the stream is ahead of the last exported sequence number
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考： [describe\$1message\$1stream](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.describe_message_stream)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    MessageStreamInfo description = client.describeMessageStream("StreamName");
    String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage();
    if (lastErrorMessage != null && !lastErrorMessage.equals("")) {
        // The last export of export destination 0 failed with some error.
        // Here is the last sequence number that was successfully exported.
        description.getExportStatuses().get(0).getLastExportedSequenceNumber();
    }
 
    if (description.getStorageStatus().getNewestSequenceNumber() >
            description.getExportStatuses().get(0).getLastExportedSequenceNumber()) {
        // The end of the stream is ahead of the last exported sequence number.
    }
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考： [describeMessageStream](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#describeMessageStream-java.lang.String-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const description = await client.describeMessageStream("StreamName");
        const lastErrorMessage = description.exportStatuses[0].errorMessage;
        if (lastErrorMessage) {
            // The last export of export destination 0 failed with some error.
            // Here is the last sequence number that was successfully exported.
            description.exportStatuses[0].lastExportedSequenceNumber;
        }
 
        if (description.storageStatus.newestSequenceNumber >
            description.exportStatuses[0].lastExportedSequenceNumber) {
            // The end of the stream is ahead of the last exported sequence number.
        }
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考： [describeMessageStream](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#describeMessageStream)

------

 

## 更新訊息串流
<a name="streammanagerclient-update-message-stream"></a>

更新現有串流的屬性。如果您的需求在建立串流之後有所變更，建議您更新串流。例如：
+ 新增 AWS 雲端 目的地的新[匯出組態](stream-export-configurations.md)。
+ 增加串流的大小上限，以變更資料匯出或保留的方式。例如，串流大小結合您在完整設定上的策略，可能會導致資料遭到刪除或拒絕，然後串流管理員才能處理資料。
+ 暫停和繼續匯出；例如，如果匯出任務長時間執行，而且您想要對上傳資料進行評價。

您的 Lambda 函數遵循此高階程序來更新串流：

1. [取得串流的描述。](#streammanagerclient-describe-message-stream)

1. 更新對應`MessageStreamDefinition`和次級物件的目標屬性。

1. 在更新的 中傳遞 `MessageStreamDefinition`。請務必包含更新串流的完整物件定義。未定義的屬性會還原為預設值。

   您可以指定要用作匯出中起始訊息的訊息序號。

### 要求
<a name="-streammanagerclient-update-message-streamreqs"></a>

此操作有下列需求：
+ <a name="streammanagerclient-min-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心版本：1.11.0
+ <a name="streammanagerclient-min-sdk-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.6.0  \$1  Java：1.5.0  \$1  Node.js：1.7.0

### 範例
<a name="streammanagerclient-update-message-stream-examples"></a>

下列程式碼片段會更新名為 的串流`StreamName`。它會更新匯出至 Kinesis Data Streams 之串流的多個屬性。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    message_stream_info = client.describe_message_stream(STREAM_NAME)
    message_stream_info.definition.max_size=536870912
    message_stream_info.definition.stream_segment_size=33554432
    message_stream_info.definition.time_to_live_millis=3600000
    message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData
    message_stream_info.definition.persistence=Persistence.Memory
    message_stream_info.definition.flush_on_write=False
    message_stream_info.definition.export_definition.kinesis=
        [KinesisConfig(  
            # Updating Export definition to add a Kinesis Stream configuration.
            identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))]
    client.update_message_stream(message_stream_info.definition)
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[updateMessageStream](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.update_message_stream) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.MessageStreamDefinition)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME);
    // Update the message stream with new values.
    client.updateMessageStream(
        messageStreamInfo.getDefinition()
            .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
            // Max Size update should be greater than initial Max Size defined in Create Message Stream request
            .withMaxSize(536870912L) // Update Max Size to 512 MB.
            .withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB.
            .withFlushOnWrite(true) // Update flush on write to true.
            .withPersistence(Persistence.Memory) // Update the persistence to Memory.
            .withTimeToLiveMillis(3600000L)  // Update TTL to 1 hour.
            .withExportDefinition(
                // Optional. Choose where/how the stream is exported to the AWS 雲端.
                messageStreamInfo.getDefinition().getExportDefinition().
                    // Updating Export definition to add a Kinesis Stream configuration.
                    .withKinesis(new ArrayList<KinesisConfig>() {{
                        add(new KinesisConfig()
                            .withIdentifier(EXPORT_IDENTIFIER)
                            .withKinesisStreamName("test"));
                        }})
            );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[update\$1message\$1stream](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#updateMessageStream-java.lang.String-) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const messageStreamInfo = await c.describeMessageStream(STREAM_NAME);
        await client.updateMessageStream(
            messageStreamInfo.definition
                // Max Size update should be greater than initial Max Size defined in Create Message Stream request
                .withMaxSize(536870912)  // Default is 256 MB. Updating Max Size to 512 MB.
                .withStreamSegmentSize(33554432)  // Default is 16 MB. Updating Segment Size to 32 MB.
                .withTimeToLiveMillis(3600000)  // By default, no TTL is enabled. Update TTL to 1 hour.
                .withStrategyOnFull(StrategyOnFull.RejectNewData)  // Required. Updating Strategy on full to reject new data.
                .withPersistence(Persistence.Memory)  // Default is File. Update the persistence to Memory
                .withFlushOnWrite(true)  // Default is false. Updating to true.
                .withExportDefinition(  
                    // Optional. Choose where/how the stream is exported to the AWS 雲端.
                    messageStreamInfo.definition.exportDefinition
                        // Updating Export definition to add a Kinesis Stream configuration.
                        .withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())])
                )
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[updateMessageStream](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#updateMessageStream) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html)

------

### 更新串流的限制
<a name="streammanagerclient-update-constraints"></a>

更新串流時，適用下列限制條件。除非以下清單中另有說明，否則更新會立即生效。
+ 您無法更新串流的持久性。若要變更此行為，[請刪除串流](#streammanagerclient-delete-message-stream)並[建立定義新持久性政策的串流](#streammanagerclient-create-message-stream)。
+ 您只能在下列情況下更新串流的大小上限：
  + 大小上限必須大於或等於串流的目前大小。<a name="messagestreaminfo-describe-stream"></a>若要尋找此資訊，[請描述串流](#streammanagerclient-describe-message-stream)，然後檢查傳回`MessageStreamInfo`物件的儲存狀態。
  + 大小上限必須大於或等於串流的區段大小。
+ 您可以將串流區段大小更新為小於串流大小上限的值。更新的設定會套用至新的客群。
+ 存留時間 (TTL) 屬性的更新適用於新的附加操作。如果您減少此值，串流管理員也可能會刪除超過 TTL 的現有區段。
+ 完整屬性策略的更新適用於新的附加操作。如果您設定策略來覆寫最舊的資料，串流管理員也可能會根據新設定覆寫現有的區段。
+ 寫入屬性排清的更新會套用至新訊息。
+ 匯出組態的更新會套用至新的匯出。更新請求必須包含您要支援的所有匯出組態。否則，串流管理員會刪除它們。
  + 當您更新匯出組態時，請指定目標匯出組態的識別符。
  + 若要新增匯出組態，請指定新匯出組態的唯一識別符。
  + 若要刪除匯出組態，請省略匯出組態。
+ 若要[更新](#streammanagerclient-update-message-stream)串流中匯出組態的起始序號，您必須指定小於最新序號的值。<a name="messagestreaminfo-describe-stream"></a>若要尋找此資訊，[請描述串流](#streammanagerclient-describe-message-stream)，然後檢查傳回`MessageStreamInfo`物件的儲存狀態。

 

## 刪除訊息串流
<a name="streammanagerclient-delete-message-stream"></a>

刪除串流。刪除串流時，磁碟中該串流的所有儲存資料都會刪除。

### 要求
<a name="streammanagerclient-delete-message-stream-reqs"></a>

此操作有下列需求：
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心版本：1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.5.0  \$1  Java：1.4.0  \$1  Node.js：1.6.0

### 範例
<a name="streammanagerclient-delete-message-stream-examples"></a>

下列程式碼片段會刪除名為 `StreamName` 的串流。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    client.delete_message_stream(stream_name="StreamName")
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[deleteMessageStream](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.delete_message_stream)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    client.deleteMessageStream("StreamName");
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[Delete\$1message\$1stream](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#deleteMessageStream-java.lang.String-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        await client.deleteMessageStream("StreamName");
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[deleteMessageStream](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#deleteMessageStream)

------

## 另請參閱
<a name="work-with-streams-see-also"></a>
+ [管理 AWS IoT Greengrass 核心上的資料串流](stream-manager.md)
+ [設定 AWS IoT Greengrass 串流管理員](configure-stream-manager.md)
+ [匯出支援 AWS 雲端 目的地的組態](stream-export-configurations.md)
+ [將資料串流匯出至 AWS 雲端 （主控台）](stream-manager-console.md)
+ [將資料串流匯出至 AWS 雲端 (CLI)](stream-manager-cli.md)
+ `StreamManagerClient` AWS IoT Greengrass 核心 SDK 參考中的 ：
  + [Python](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html)
  + [Java](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html)
  + [Node.js](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html)

# 匯出支援 AWS 雲端 目的地的組態
<a name="stream-export-configurations"></a>

使用者定義的 Lambda 函數使用 AWS IoT Greengrass 核心 SDK `StreamManagerClient`與串流管理員互動。當 Lambda [函數建立串流](work-with-streams.md#streammanagerclient-create-message-stream)或[更新串流](work-with-streams.md#streammanagerclient-create-message-stream)時，它會傳遞代表串流屬性的`MessageStreamDefinition`物件，包括匯出定義。`ExportDefinition` 物件包含為串流定義的匯出組態。串流管理員使用這些匯出組態來判斷匯出串流的位置和方式。

![\[ExportDefinition 屬性類型的物件模型圖表。\]](http://docs.aws.amazon.com/zh_tw/greengrass/v1/developerguide/images/stream-manager-exportconfigs.png)


您可以在串流上定義零或多個匯出組態，包括單一目的地類型的多個匯出組態。例如，您可以將串流匯出至兩個 AWS IoT Analytics 頻道和一個 Kinesis 資料串流。

對於失敗的匯出嘗試，串流管理員會持續重試 AWS 雲端 以最多五分鐘的間隔將資料匯出至 。重試嘗試次數沒有上限。

**注意**  
<a name="streammanagerclient-http-config"></a>`StreamManagerClient` 也提供目標目的地，可用來將串流匯出至 HTTP 伺服器。此目標僅供測試之用。它不穩定或不支援在生產環境中使用。

**Topics**
+ [AWS IoT Analytics 頻道](#export-to-iot-analytics)
+ [Amazon Kinesis 資料串流](#export-to-kinesis)
+ [AWS IoT SiteWise 資產屬性](#export-to-iot-sitewise)
+ [Amazon S3 物件](#export-to-s3)

您負責維護這些 AWS 雲端 資源。

## AWS IoT Analytics 頻道
<a name="export-to-iot-analytics"></a>

串流管理員支援自動匯出至 AWS IoT Analytics。 <a name="ita-export-destination"></a>AWS IoT Analytics 可讓您對資料執行進階分析，以協助做出商業決策並改善機器學習模型。如需詳細資訊，請參閱*AWS IoT Analytics 《 使用者指南*》中的[什麼是 AWS IoT Analytics？](https://docs.aws.amazon.com/iotanalytics/latest/userguide/welcome.html)。

在 AWS IoT Greengrass 核心 SDK 中，您的 Lambda 函數會使用 `IoTAnalyticsConfig` 來定義此目的地類型的匯出組態。如需詳細資訊，請參閱目標語言的 SDK 參考：
+ Python SDK 中的 [IoTAnalyticsConfig](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.IoTAnalyticsConfig) 
+ Java 開發套件中的 [IoTAnalyticsConfig](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/export/IoTAnalyticsConfig.html) 
+ Node.js SDK 中的 [IoTAnalyticsConfig](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.IoTAnalyticsConfig.html) 

### 要求
<a name="export-to-iot-analytics-reqs"></a>

此匯出目的地有下列需求：
+ 中的目標頻道 AWS IoT Analytics 必須與 Greengrass 群組位於相同 AWS 帳戶 和 AWS 區域 。
+ 必須[Greengrass 群組角色](group-role.md)允許將 `iotanalytics:BatchPutMessage`許可設為目標頻道。例如：

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "iotanalytics:BatchPutMessage"
              ],
              "Resource": [
              "arn:aws:iotanalytics:us-east-1:123456789012:channel/channel_1_name",
      "arn:aws:iotanalytics:us-east-1:123456789012:channel/channel_2_name"
              ]
          }
      ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以使用萬用字元`*`命名機制，授予對資源的精細或條件式存取。如需詳細資訊，請參閱《[IAM 使用者指南》中的新增和移除 IAM 政策](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。 **

### 匯出至 AWS IoT Analytics
<a name="export-streams-to-iot-analytics"></a>

若要建立匯出至 的串流 AWS IoT Analytics，您的 Lambda [函數會建立](work-with-streams.md#streammanagerclient-create-message-stream)包含一或多個`IoTAnalyticsConfig`物件的匯出定義串流。此物件會定義匯出設定，例如目標頻道、批次大小、批次間隔和優先順序。

當您的 Lambda 函數從裝置接收資料時，它們會將包含資料 Blob [的訊息附加](work-with-streams.md#streammanagerclient-append-message)到目標串流。

然後，串流管理員會根據串流匯出組態中定義的批次設定和優先順序匯出資料。

 

## Amazon Kinesis 資料串流
<a name="export-to-kinesis"></a>

串流管理員支援自動匯出至 Amazon Kinesis Data Streams。<a name="aks-export-destination"></a>Kinesis Data Streams 通常用於彙總大量資料，並將其載入資料倉儲或地圖縮減叢集。如需詳細資訊，請參閱[《Amazon Kinesis 開發人員指南》中的什麼是 Amazon Kinesis Data Streams？](https://docs.aws.amazon.com/streams/latest/dev/what-is-this-service.html)。 *Amazon Kinesis *

在 AWS IoT Greengrass 核心 SDK 中，您的 Lambda 函數會使用 `KinesisConfig` 來定義此目的地類型的匯出組態。如需詳細資訊，請參閱目標語言的 SDK 參考：
+ Python SDK 中的 [KinesisConfig](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.KinesisConfig) 
+ Java 開發套件中的 [KinesisConfig](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/export/KinesisConfig.html) 
+ Node.js SDK 中的 [KinesisConfig](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.KinesisConfig.html) 

### 要求
<a name="export-to-kinesis-reqs"></a>

此匯出目的地有下列需求：
+ Kinesis Data Streams 中的目標串流必須與 Greengrass 群組位於相同 AWS 帳戶 和 AWS 區域 。
+ 必須[Greengrass 群組角色](group-role.md)允許 以資料串流為目標的`kinesis:PutRecords`許可。例如：

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "kinesis:PutRecords"
              ],
              "Resource": [
              "arn:aws:kinesis:us-east-1:123456789012:stream/stream_1_name",
      "arn:aws:kinesis:us-east-1:123456789012:stream/stream_2_name"
              ]
          }
      ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以使用萬用字元`*`命名機制，授予對資源的精細或條件式存取。如需詳細資訊，請參閱《[IAM 使用者指南》中的新增和移除 IAM 政策](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。 **

### 匯出至 Kinesis Data Streams
<a name="export-streams-to-kinesis"></a>

若要建立匯出至 Kinesis Data Streams 的串流，您的 Lambda [函數會建立](work-with-streams.md#streammanagerclient-create-message-stream)包含一或多個`KinesisConfig`物件的匯出定義串流。此物件會定義匯出設定，例如目標資料串流、批次大小、批次間隔和優先順序。

當您的 Lambda 函數從裝置接收資料時，它們會將包含資料 Blob [的訊息附加](work-with-streams.md#streammanagerclient-append-message)到目標串流。然後，串流管理員會根據串流匯出組態中定義的批次設定和優先順序匯出資料。

串流管理員會為每個上傳至 Amazon Kinesis 的記錄產生唯一的隨機 UUID 做為分割區索引鍵。

 

## AWS IoT SiteWise 資產屬性
<a name="export-to-iot-sitewise"></a>

串流管理員支援自動匯出至 AWS IoT SiteWise。 <a name="itsw-export-destination"></a>AWS IoT SiteWise 可讓您大規模收集、組織和分析工業設備的資料。如需詳細資訊，請參閱*AWS IoT SiteWise 《 使用者指南*》中的[什麼是 AWS IoT SiteWise？](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/what-is-sitewise.html)。

在 AWS IoT Greengrass 核心 SDK 中，您的 Lambda 函數會使用 `IoTSiteWiseConfig` 來定義此目的地類型的匯出組態。如需詳細資訊，請參閱目標語言的 SDK 參考：
+ Python SDK 中的 [IoTSiteWiseConfig](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.IoTSiteWiseConfig) 
+ Java 開發套件中的 [IoTSiteWiseConfig](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/export/IoTSiteWiseConfig.html) 
+ Node.js SDK 中的 [IoTSiteWiseConfig](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.IoTSiteWiseConfig.html) 

**注意**  
AWS 也提供 [IoT SiteWise 連接器](iot-sitewise-connector.md)，這是您可以與 OPC-UA 來源搭配使用的預先建置解決方案。

### 要求
<a name="export-to-iot-sitewise-reqs"></a>

此匯出目的地有下列需求：
+ 中的目標資產屬性 AWS IoT SiteWise 必須與 Greengrass 群組位於相同 AWS 帳戶 和 AWS 區域 。
**注意**  
如需 AWS IoT SiteWise 支援的區域清單，請參閱《 *AWS 一般參考*》中的[AWS IoT SiteWise 端點和配額](https://docs.aws.amazon.com/general/latest/gr/iot-sitewise.html#iot-sitewise_region)。
+ 必須[Greengrass 群組角色](group-role.md)允許 以資產屬性為目標的`iotsitewise:BatchPutAssetPropertyValue`許可。下列範例政策使用 `iotsitewise:assetHierarchyPath`條件索引鍵授予對目標根資產及其子資產的存取權。您可以從`Condition`政策中移除 ，以允許存取所有 AWS IoT SiteWise 資產或指定個別資產ARNs。

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
               "Effect": "Allow",
               "Action": "iotsitewise:BatchPutAssetPropertyValue",
               "Resource": "*",
               "Condition": {
                   "StringLike": {
                       "iotsitewise:assetHierarchyPath": [
                           "/root node asset ID",
                           "/root node asset ID/*"
                       ]
                   }
               }
          }
      ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以使用萬用字元`*`命名機制，授予對資源的精細或條件式存取。如需詳細資訊，請參閱《[IAM 使用者指南》中的新增和移除 IAM 政策](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。 **

  如需重要的安全性資訊，請參閱*AWS IoT SiteWise 《 使用者指南*》中的 [ BatchPutAssetPropertyValue 授權](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/security_iam_service-with-iam.html#security_iam_service-with-iam-id-based-policies-batchputassetpropertyvalue-action)。

### 匯出至 AWS IoT SiteWise
<a name="export-streams-to-sitewise"></a>

若要建立匯出至 的串流 AWS IoT SiteWise，您的 Lambda [函數會建立](work-with-streams.md#streammanagerclient-create-message-stream)包含一或多個`IoTSiteWiseConfig`物件的匯出定義串流。此物件會定義匯出設定，例如批次大小、批次間隔和優先順序。

當您的 Lambda 函數從裝置接收資產屬性資料時，它們會將包含資料的訊息附加到目標串流。訊息是 JSON 序列化`PutAssetPropertyValueEntry`物件，其中包含一或多個資產屬性的屬性值。如需詳細資訊，請參閱[附加訊息](work-with-streams.md#streammanagerclient-append-message-sitewise)以取得 AWS IoT SiteWise 匯出目的地。

**注意**  
<a name="BatchPutAssetPropertyValue-data-reqs"></a>當您傳送資料至 時 AWS IoT SiteWise，您的資料必須符合 `BatchPutAssetPropertyValue`動作的要求。如需詳細資訊，請參閱《AWS IoT SiteWise API 參考》**中的 [BatchPutAssetPropertyValue](https://docs.aws.amazon.com/iot-sitewise/latest/APIReference/API_BatchPutAssetPropertyValue.html)。

然後，串流管理員會根據串流匯出組態中定義的批次設定和優先順序匯出資料。

 

您可以調整串流管理員設定和 Lambda 函數邏輯，以設計匯出策略。例如：
+ 對於近乎即時的匯出，請設定低批次大小和間隔設定，並在接收資料時將資料附加至串流。
+ 為了最佳化批次處理、降低頻寬限制或將成本降至最低，您的 Lambda 函數可以在將資料附加至串流之前，合併單一資產屬性收到的timestamp-quality-value(TQV) 資料點。一種策略是在一個訊息中批次處理最多 10 個不同的屬性資產組合或屬性別名的項目，而不是為相同的屬性傳送多個項目。這有助於串流管理員保持在[AWS IoT SiteWise 配額](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/quotas.html)內。

 

## Amazon S3 物件
<a name="export-to-s3"></a>

串流管理員支援自動匯出至 Amazon S3。<a name="s3-export-destination"></a>您可以使用 Amazon S3 來存放和擷取大量資料。如需詳細資訊，請參閱[《Amazon Simple Storage Service 開發人員指南》中的什麼是 Amazon S3？](https://docs.aws.amazon.com/AmazonS3/latest/dev/Welcome.html)。 **

在 AWS IoT Greengrass 核心 SDK 中，您的 Lambda 函數會使用 `S3ExportTaskExecutorConfig` 來定義此目的地類型的匯出組態。如需詳細資訊，請參閱目標語言的 SDK 參考：
+ Python SDK 中的 [S3ExportTaskExecutorConfig](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.S3ExportTaskExecutorConfig) 
+ Java 開發套件中的 [S3ExportTaskExecutorConfig](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/export/S3ExportTaskExecutorConfig.html) 
+ Node.js SDK 中的 [S3ExportTaskExecutorConfig](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.S3ExportTaskExecutorConfig.html) 

### 要求
<a name="export-to-s3-reqs"></a>

此匯出目的地有下列需求：
+ 目標 Amazon S3 儲存貯體必須與 Greengrass 群組 AWS 帳戶 位於相同的 中。
+ 如果 Greengrass 群組[的預設容器化](lambda-group-config.md#lambda-containerization-groupsettings)是 **Greengrass 容器**，您必須將 [STREAM\$1MANAGER\$1READ\$1ONLY\$1DIRS](configure-stream-manager.md#stream-manager-read-only-directories) 參數設定為使用根檔案系統下`/tmp`或不在其中的輸入檔案目錄。
+ 如果在 **Greengrass 容器**模式下執行的 Lambda 函數將輸入檔案寫入輸入檔案目錄，您必須為目錄建立本機磁碟區資源，並將目錄掛載到具有寫入許可的容器。這可確保檔案寫入根檔案系統，並在容器外部可見。如需詳細資訊，請參閱[使用 Lambda 函數和連接器存取本機資源](access-local-resources.md)。
+ 必須[Greengrass 群組角色](group-role.md)允許目標儲存貯體的下列許可。例如：

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "s3:PutObject",
                  "s3:AbortMultipartUpload",
                  "s3:ListMultipartUploadParts"
              ],
              "Resource": [
                  "arn:aws:s3:::bucket-1-name/*",
                  "arn:aws:s3:::bucket-2-name/*"
              ]
          }
      ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以使用萬用字元`*`命名機制，授予對資源的精細或條件式存取。如需詳細資訊，請參閱《[IAM 使用者指南》中的新增和移除 IAM 政策](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。 **

### 匯出至 Amazon S3
<a name="export-streams-to-s3"></a>

若要建立匯出至 Amazon S3 的串流，您的 Lambda 函數會使用 `S3ExportTaskExecutorConfig` 物件來設定匯出政策。政策會定義匯出設定，例如分段上傳閾值和優先順序。對於 Amazon S3 匯出，串流管理員會上傳從核心裝置上的本機檔案讀取的資料。若要啟動上傳，您的 Lambda 函數會將匯出任務附加至目標串流。匯出任務包含輸入檔案和目標 Amazon S3 物件的相關資訊。串流管理員會依照任務附加至串流的順序來執行任務。

**注意**  
<a name="bucket-not-key-must-exist"></a>目標儲存貯體必須已存在於您的 中 AWS 帳戶。如果指定金鑰的物件不存在，串流管理員會為您建立物件。

 此高階工作流程如下圖所示。

![\[Amazon S3 匯出的串流管理員工作流程圖表。\]](http://docs.aws.amazon.com/zh_tw/greengrass/v1/developerguide/images/stream-manager-s3.png)


串流管理員會使用分段上傳閾值屬性、[最小組件大小](configure-stream-manager.md#stream-manager-minimum-part-size)設定，以及輸入檔案的大小來判斷如何上傳資料。分段上傳閾值必須大於或等於最小分段大小。如果您想要平行上傳資料，您可以建立多個串流。

指定目標 Amazon S3 物件的金鑰可以在`!{timestamp:value}`預留位置中包含有效的 [Java DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html) 字串。您可以根據輸入檔案資料上傳的時間，使用這些時間戳記預留位置來分割 Amazon S3 中的資料。例如，下列金鑰名稱會解析為 等值`my-key/2020/12/31/data.txt`。

```
my-key/!{timestamp:YYYY}/!{timestamp:MM}/!{timestamp:dd}/data.txt
```

**注意**  
如果您想要監控串流的匯出狀態，請先建立狀態串流，然後將匯出串流設定為使用它。如需詳細資訊，請參閱[監控匯出任務](#monitor-export-status-s3)。

#### 管理輸入資料
<a name="manage-s3-input-data"></a>

您可以編寫程式碼，讓 IoT 應用程式用來管理輸入資料的生命週期。下列工作流程範例示範如何使用 Lambda 函數來管理此資料。

1. 本機程序會從裝置或周邊接收資料，然後將資料寫入核心裝置上的目錄中的檔案。這些是串流管理員的輸入檔案。
**注意**  
若要判斷您是否必須設定輸入檔案目錄的存取權，請參閱 [STREAM\$1MANAGER\$1READ\$1ONLY\$1DIRS](configure-stream-manager.md#stream-manager-read-only-directories) 參數。  
串流管理員在 中執行的程序會繼承群組[預設存取身分](lambda-group-config.md#lambda-access-identity-groupsettings)的所有檔案系統許可。串流管理員必須具有存取輸入檔案的許可。如有必要，您可以使用 `chmod(1)`命令來變更檔案的許可。

1. Lambda 函數會掃描目錄，並在[建立新檔案時將匯出任務附加](work-with-streams.md#streammanagerclient-append-message-export-task)至目標串流。任務是 JSON 序列化`S3ExportTaskDefinition`物件，可指定輸入檔案的 URL、目標 Amazon S3 儲存貯體和金鑰，以及選用的使用者中繼資料。

1. 串流管理員會讀取輸入檔案，並依附加任務的順序將資料匯出至 Amazon S3。<a name="bucket-not-key-must-exist"></a>目標儲存貯體必須已存在於您的 中 AWS 帳戶。如果指定金鑰的物件不存在，串流管理員會為您建立物件。

1. Lambda 函數會從狀態串流[讀取訊息](work-with-streams.md#streammanagerclient-read-messages)，以監控匯出狀態。匯出任務完成後，Lambda 函數可以刪除對應的輸入檔案。如需詳細資訊，請參閱[監控匯出任務](#monitor-export-status-s3)。

### 監控匯出任務
<a name="monitor-export-status-s3"></a>

您可以編寫程式碼，讓 IoT 應用程式用來監控 Amazon S3 匯出的狀態。您的 Lambda 函數必須建立狀態串流，然後將匯出串流設定為將狀態更新寫入狀態串流。單一狀態串流可以從匯出至 Amazon S3 的多個串流接收狀態更新。

首先，[建立](work-with-streams.md#streammanagerclient-create-message-stream)要用作狀態串流的串流。您可以設定串流的大小和保留政策，以控制狀態訊息的生命週期。例如：
+ `Memory` 如果您不想儲存狀態訊息，請將 `Persistence`設定為 。
+ `StrategyOnFull` 設定為 ，`OverwriteOldestData`以免遺失新的狀態訊息。

然後，建立或更新匯出串流以使用狀態串流。具體而言，請設定串流`S3ExportTaskExecutorConfig`匯出組態的狀態組態屬性。這會通知串流管理員將有關匯出任務的狀態訊息寫入狀態串流。在 `StatusConfig` 物件中，指定狀態串流的名稱和詳細程度。下列支援的值範圍從最詳細 (`ERROR`) 到最詳細 (`TRACE`)。預設值為 `INFO`。
+ `ERROR`
+ `WARN`
+ `INFO`
+ `DEBUG`
+ `TRACE`

 

下列範例工作流程顯示 Lambda 函數如何使用狀態串流來監控匯出狀態。

1. 如前一個工作流程所述，Lambda 函數[會將匯出任務附加](work-with-streams.md#streammanagerclient-append-message-export-task)至設定為將有關匯出任務的狀態訊息寫入狀態串流的串流。附加操作會傳回代表任務 ID 的序號。

1. Lambda 函數會從狀態串流循序[讀取訊息](work-with-streams.md#streammanagerclient-read-messages)，然後根據串流名稱和任務 ID 或根據訊息內容的匯出任務屬性來篩選訊息。例如，Lambda 函數可以依匯出任務的輸入檔案 URL 進行篩選，其由訊息內容中的`S3ExportTaskDefinition`物件表示。

   下列狀態碼表示匯出任務已達到已完成狀態：
   + `Success`。 上傳已成功完成。
   + `Failure`。 串流管理員發生錯誤，例如，指定的儲存貯體不存在。解決問題後，您可以再次將匯出任務附加到串流。
   + `Canceled`。 任務因為串流或匯出定義已刪除，或任務的time-to-live(TTL) 期間過期而中止。
**注意**  
任務也可能的狀態為 `InProgress`或 `Warning`。當事件傳回不會影響任務執行的錯誤時，串流管理員會發出警告。例如，無法清除中止的部分上傳會傳回警告。

1. 匯出任務完成後，Lambda 函數可以刪除對應的輸入檔案。

下列範例顯示 Lambda 函數如何讀取和處理狀態訊息。

------
#### [ Python ]

```
import time
from greengrasssdk.stream_manager import (
    ReadMessagesOptions,
    Status,
    StatusConfig,
    StatusLevel,
    StatusMessage,
    StreamManagerClient,
)
from greengrasssdk.stream_manager.util import Util

client = StreamManagerClient()
 
try:
    # Read the statuses from the export status stream
    is_file_uploaded_to_s3 = False
    while not is_file_uploaded_to_s3:
        try:
            messages_list = client.read_messages(
                "StatusStreamName", ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000)
            )
            for message in messages_list:
                # Deserialize the status message first.
                status_message = Util.deserialize_json_bytes_to_obj(message.payload, StatusMessage)

                # Check the status of the status message. If the status is "Success",
                # the file was successfully uploaded to S3.
                # If the status was either "Failure" or "Cancelled", the server was unable to upload the file to S3.
                # We will print the message for why the upload to S3 failed from the status message.
                # If the status was "InProgress", the status indicates that the server has started uploading
                # the S3 task.
                if status_message.status == Status.Success:
                    logger.info("Successfully uploaded file at path " + file_url + " to S3.")
                    is_file_uploaded_to_s3 = True
                elif status_message.status == Status.Failure or status_message.status == Status.Canceled:
                    logger.info(
                        "Unable to upload file at path " + file_url + " to S3. Message: " + status_message.message
                    )
                    is_file_uploaded_to_s3 = True
            time.sleep(5)
        except StreamManagerException:
            logger.exception("Exception while running")
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[read\$1messages](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.read_messages) \$1 [StatusMessage](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.StatusMessage)

------
#### [ Java ]

```
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient;
import com.amazonaws.greengrass.streammanager.client.utils.ValidateAndSerialize;
import com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions;
import com.amazonaws.greengrass.streammanager.model.Status;
import com.amazonaws.greengrass.streammanager.model.StatusConfig;
import com.amazonaws.greengrass.streammanager.model.StatusLevel;
import com.amazonaws.greengrass.streammanager.model.StatusMessage;

try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    try {
        boolean isS3UploadComplete = false;
        while (!isS3UploadComplete) {
            try {
                // Read the statuses from the export status stream
                List<Message> messages = client.readMessages("StatusStreamName",
                    new ReadMessagesOptions().withMinMessageCount(1L).withReadTimeoutMillis(1000L));
                for (Message message : messages) {
                    // Deserialize the status message first.
                    StatusMessage statusMessage = ValidateAndSerialize.deserializeJsonBytesToObj(message.getPayload(), StatusMessage.class);
                    // Check the status of the status message. If the status is "Success", the file was successfully uploaded to S3.
                    // If the status was either "Failure" or "Canceled", the server was unable to upload the file to S3.
                    // We will print the message for why the upload to S3 failed from the status message.
                    // If the status was "InProgress", the status indicates that the server has started uploading the S3 task.
                    if (Status.Success.equals(statusMessage.getStatus())) {
                        System.out.println("Successfully uploaded file at path " + FILE_URL + " to S3.");
                        isS3UploadComplete = true;
                     } else if (Status.Failure.equals(statusMessage.getStatus()) || Status.Canceled.equals(statusMessage.getStatus())) {
                        System.out.println(String.format("Unable to upload file at path %s to S3. Message %s",
                            statusMessage.getStatusContext().getS3ExportTaskDefinition().getInputUrl(),
                            statusMessage.getMessage()));
                        sS3UploadComplete = true;
                    }
                }
            } catch (StreamManagerException ignored) {
            } finally {
                // Sleep for sometime for the S3 upload task to complete before trying to read the status message.
                Thread.sleep(5000);
            }
        } catch (e) {
        // Properly handle errors.
    }
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[readMessages](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#readMessages-java.lang.String-com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions-) \$1 [StatusMessage](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/StatusMessage.html)

------
#### [ Node.js ]

```
const {
    StreamManagerClient, ReadMessagesOptions,
    Status, StatusConfig, StatusLevel, StatusMessage,
    util,
} = require('aws-greengrass-core-sdk').StreamManager;

const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        let isS3UploadComplete = false;
        while (!isS3UploadComplete) {
            try {
                // Read the statuses from the export status stream
                const messages = await c.readMessages("StatusStreamName",
                    new ReadMessagesOptions()
                        .withMinMessageCount(1)
                        .withReadTimeoutMillis(1000));

                messages.forEach((message) => {
                    // Deserialize the status message first.
                    const statusMessage = util.deserializeJsonBytesToObj(message.payload, StatusMessage);
                    // Check the status of the status message. If the status is 'Success', the file was successfully uploaded to S3.
                    // If the status was either 'Failure' or 'Cancelled', the server was unable to upload the file to S3.
                    // We will print the message for why the upload to S3 failed from the status message.
                    // If the status was "InProgress", the status indicates that the server has started uploading the S3 task.
                    if (statusMessage.status === Status.Success) {
                        console.log(`Successfully uploaded file at path ${FILE_URL} to S3.`);
                        isS3UploadComplete = true;
                    } else if (statusMessage.status === Status.Failure || statusMessage.status === Status.Canceled) {
                        console.log(`Unable to upload file at path ${FILE_URL} to S3. Message: ${statusMessage.message}`);
                        isS3UploadComplete = true;
                    }
                });
                // Sleep for sometime for the S3 upload task to complete before trying to read the status message.
                await new Promise((r) => setTimeout(r, 5000));
            } catch (e) {
                // Ignored
            }
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[readMessages](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#readMessages) \$1 [StatusMessage](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StatusMessage.html)

------

# 將資料串流匯出至 AWS 雲端 （主控台）
<a name="stream-manager-console"></a>

本教學課程說明如何使用 AWS IoT 主控台來設定和部署已啟用串流管理員的 AWS IoT Greengrass 群組。群組包含使用者定義的 Lambda 函數，可寫入串流管理員中的串流，然後自動匯出至 AWS 雲端。

串流管理員可讓擷取、處理和匯出大量資料串流更有效率且更可靠。在本教學課程中，您會建立使用 IoT 資料的 `TransferStream` Lambda 函數。Lambda 函數使用 AWS IoT Greengrass 核心 SDK 在串流管理員中建立串流，然後讀取和寫入該串流。串流管理員接著會將串流匯出至 Kinesis Data Streams。下圖顯示此工作流程。

![\[串流管理工作流程圖\]](http://docs.aws.amazon.com/zh_tw/greengrass/v1/developerguide/images/stream-manager-scenario.png)


本教學課程的重點是展示使用者定義的 Lambda 函數如何使用 AWS IoT Greengrass Core SDK 中的 `StreamManagerClient` 物件與串流管理員互動。為了簡化，您為此教學課程建立的 Python Lambda 函數會產生模擬的裝置資料。

## 先決條件
<a name="stream-manager-console-prerequisites"></a>

為完成此教學課程您需要：<a name="stream-manager-howto-prereqs"></a>
+ Greengrass 群組和 Greengrass 核心 (1.10 版或更新版本）。如需如何建立 Greengrass 群組和核心的詳細資訊，請參閱 [入門 AWS IoT Greengrass](gg-gs.md)。入門教學課程也包含安裝 AWS IoT Greengrass 核心軟體的步驟。
**注意**  <a name="stream-manager-not-supported-openwrt"></a>
<a name="stream-manager-not-supported-openwrt-para"></a>OpenWrt 發行版本不支援串流管理員。
+ 在核心裝置上安裝 Java 8 執行時間 (JDK 8)。<a name="install-java8-runtime-general"></a>
  + 針對以 Debian 為基礎的發行版本 (包括 Raspbian) 或以 Ubuntu 為基礎的發行版本，請執行下列命令：

    ```
    sudo apt install openjdk-8-jdk
    ```
  + 針對以 Red Hat 為基礎的發行版本 (包括 Amazon Linux)，請執行下列命令：

    ```
    sudo yum install java-1.8.0-openjdk
    ```

    如需詳細資訊，請參閱 OpenJDK 文件上的[如何下載和安裝預先建置的 OpenJDK 套件](https://openjdk.java.net/install/)。
+ AWS IoT Greengrass 適用於 Python 1.5.0 版或更新版本的核心 SDK。若要在適用於 Python AWS IoT Greengrass 的核心 SDK `StreamManagerClient`中使用 ，您必須：
  + 在核心裝置上安裝 Python 3.7 或更新版本。
  + 在 Lambda 函數部署套件中包含 SDK 及其相依性。本教學課程提供相關指示。
**提示**  
您可以搭配使用 `StreamManagerClient` 與 Java 或 NodeJS。如需程式碼範例，請參閱 GitHub 上的[AWS IoT Greengrass 適用於 Java 的 Core SDK](https://github.com/aws/aws-greengrass-core-sdk-java/blob/master/samples/StreamManagerKinesis/src/main/java/com/amazonaws/greengrass/examples/StreamManagerKinesis.java) 和[AWS IoT Greengrass 適用於 Node.js 的 Core SDK](https://github.com/aws/aws-greengrass-core-sdk-js/blob/master/greengrassExamples/StreamManagerKinesis/index.js)。
+ 在與 Greengrass 群組相同的 Amazon Kinesis Data Streams 中**MyKinesisStream**建立名為 AWS 區域 的目的地串流。如需詳細資訊，請參閱《*Amazon Kinesis 開發人員指南*》中的[建立串流](https://docs.aws.amazon.com/streams/latest/dev/fundamental-stream.html#create-stream)。
**注意**  
在本教學課程中，串流管理員會將資料匯出至 Kinesis Data Streams，這會向您的 收取費用 AWS 帳戶。如需定價的相關資訊，請參閱 [Kinesis Data Streams 定價](https://aws.amazon.com/kinesis/data-streams/pricing/)。  
若要避免產生費用，您可以執行本教學課程而不建立 Kinesis 資料串流。在此情況下，您會檢查日誌，以查看串流管理員嘗試將串流匯出至 Kinesis Data Streams。
+ 新增至 的 IAM 政策[Greengrass 群組角色](group-role.md)，允許對目標資料串流執行 `kinesis:PutRecords`動作，如下列範例所示：

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "kinesis:PutRecords"
              ],
              "Resource": [
              "arn:aws:kinesis:us-east-1:123456789012:stream/MyKinesisStream"
              ]
          }
      ]
  }
  ```

------

本教學課程所述以下高階執行步驟：

1. [建立 Lambda 函數部署套件](#stream-manager-console-create-deployment-package)

1. [建立 Lambda 函式](#stream-manager-console-create-function)

1. [將函數新增到群組](#stream-manager-console-create-gg-function)

1. [啟用串流管理員](#stream-manager-console-enable-stream-manager)

1. [設定本機記錄](#stream-manager-console-configure-logging)

1. [部署群組](#stream-manager-console-create-deployment)

1. [測試應用程式。](#stream-manager-console-test-application)

此教學課程需約 20 分鐘完成。

## 步驟 1：建立 Lambda 函數部署套件
<a name="stream-manager-console-create-deployment-package"></a>

在此步驟中，您會建立包含 Python 函數程式碼和相依性的 Lambda 函數部署套件。您稍後在其中建立 Lambda 函數時上傳此套件 AWS Lambda。Lambda 函數使用 AWS IoT Greengrass 核心 SDK 來建立本機串流並與之互動。

**注意**  
 使用者定義的 Lambda 函數必須使用 [AWS IoT Greengrass 核心 SDK](lambda-functions.md#lambda-sdks-core) 與串流管理員互動。如需有關 Greengrass 串流管理員需求的詳細資訊，請參閱 [Greengrass 串流管理員需求](stream-manager.md#stream-manager-requirements)。

1.  下載[AWS IoT Greengrass 適用於 Python 的 核心 SDK](lambda-functions.md#lambda-sdks-core) 1.5.0 版或更新版本。

1. <a name="unzip-ggc-sdk"></a>解壓縮下載的封裝，以取得軟體開發套件。SDK 為 `greengrasssdk` 資料夾。

1. <a name="install-python-sdk-dependencies-stream-manager"></a>安裝套件相依性，以在 Lambda 函數部署套件中包含 開發套件。<a name="python-sdk-dependencies-stream-manager"></a>

   1. 前往包含 `requirements.txt` 檔案的軟體開發套件目錄。這個檔案會列出相依性。

   1. 安裝軟體開發套件相依性。例如，執行下列 `pip` 命令，將它們安裝在目前的目錄中：

      ```
      pip install --target . -r requirements.txt
      ```

1. 將以下 Python 程式碼函數儲存在名為 `transfer_stream.py` 的本機檔案中。
**提示**  
 如需使用 Java 和 NodeJS 的範例程式碼，請參閱 GitHub 上的[AWS IoT Greengrass 適用於 Java 的核心開發套件](https://github.com/aws/aws-greengrass-core-sdk-java/blob/master/samples/StreamManagerKinesis/src/main/java/com/amazonaws/greengrass/examples/StreamManagerKinesis.java)和[AWS IoT Greengrass 適用於 Node.js 的核心開發套件](https://github.com/aws/aws-greengrass-core-sdk-js/blob/master/greengrassExamples/StreamManagerKinesis/index.js)。

   ```
   import asyncio
   import logging
   import random
   import time
   
   from greengrasssdk.stream_manager import (
       ExportDefinition,
       KinesisConfig,
       MessageStreamDefinition,
       ReadMessagesOptions,
       ResourceNotFoundException,
       StrategyOnFull,
       StreamManagerClient,
   )
   
   
   # This example creates a local stream named "SomeStream".
   # It starts writing data into that stream and then stream manager automatically exports  
   # the data to a customer-created Kinesis data stream named "MyKinesisStream". 
   # This example runs forever until the program is stopped.
   
   # The size of the local stream on disk will not exceed the default (which is 256 MB).
   # Any data appended after the stream reaches the size limit continues to be appended, and
   # stream manager deletes the oldest data until the total stream size is back under 256 MB.
   # The Kinesis data stream in the cloud has no such bound, so all the data from this script is
   # uploaded to Kinesis and you will be charged for that usage.
   
   
   def main(logger):
       try:
           stream_name = "SomeStream"
           kinesis_stream_name = "MyKinesisStream"
   
           # Create a client for the StreamManager
           client = StreamManagerClient()
   
           # Try deleting the stream (if it exists) so that we have a fresh start
           try:
               client.delete_message_stream(stream_name=stream_name)
           except ResourceNotFoundException:
               pass
   
           exports = ExportDefinition(
               kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)]
           )
           client.create_message_stream(
               MessageStreamDefinition(
                   name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports
               )
           )
   
           # Append two messages and print their sequence numbers
           logger.info(
               "Successfully appended message to stream with sequence number %d",
               client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")),
           )
           logger.info(
               "Successfully appended message to stream with sequence number %d",
               client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")),
           )
   
           # Try reading the two messages we just appended and print them out
           logger.info(
               "Successfully read 2 messages: %s",
               client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)),
           )
   
           logger.info("Now going to start writing random integers between 0 and 1000 to the stream")
           # Now start putting in random data between 0 and 1000 to emulate device sensor input
           while True:
               logger.debug("Appending new random integer to stream")
               client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big"))
               time.sleep(1)
   
       except asyncio.TimeoutError:
           logger.exception("Timed out while executing")
       except Exception:
           logger.exception("Exception while running")
   
   
   def function_handler(event, context):
       return
   
   
   logging.basicConfig(level=logging.INFO)
   # Start up this sample code
   main(logger=logging.getLogger())
   ```

1. 將下列項目壓縮成名為 `transfer_stream_python.zip` 的檔案。這是您的 Lambda 函數部署套件。
   + **transfer\$1stream.py**。應用程式邏輯。
   + **greengrasssdk**。發佈 MQTT 訊息的 Python Greengrass Lambda 函數所需的程式庫。

     [串流管理員操作](work-with-streams.md)可在適用於 Python 的核心 AWS IoT Greengrass SDK 1.5.0 版或更新版本中使用。
   + 您為適用於 Python AWS IoT Greengrass 的核心 SDK 安裝的相依性 （例如， `cbor2`目錄）。

   當您建立 `zip` 檔案時，請只包含這些項目，而非包含的資料夾。

## 步驟 2：建立 Lambda 函數
<a name="stream-manager-console-create-function"></a>

在此步驟中，您會使用 AWS Lambda 主控台來建立 Lambda 函數，並將其設定為使用您的部署套件。然後，您會發佈函數版本和建立別名。

1. 首先，建立 Lambda 函數。

   1. <a name="lambda-console-open"></a>在 中 AWS 管理主控台，選擇**服務**，然後開啟 AWS Lambda 主控台。

   1. <a name="lambda-console-create-function"></a>選擇**建立函數**，然後選擇**從頭開始撰寫**。

   1. 在 **Basic information (基本資訊)** 區段中，使用下列值：
      + 針對**函數名稱**，請輸入 **TransferStream**。
      + 針對**執行期**，選擇 **Python 3.7**。
      + 對於**許可**，請保留預設設定。這會建立授予基本 Lambda 許可的執行角色。不會使用此角色 AWS IoT Greengrass。

   1. <a name="lambda-console-save-function"></a>請在頁面底部，選擇**建立函式**。

1. 接著，註冊處理常式並上傳您的 Lambda 函數部署套件。

   1. <a name="lambda-console-upload"></a>在**程式碼**索引標籤的**程式碼來源**下，選擇**上傳來源**。從下拉式清單中，選擇 **.zip 檔案**。  
![\[從下拉式清單上傳，並反白顯示 .zip 檔案。\]](http://docs.aws.amazon.com/zh_tw/greengrass/v1/developerguide/images/lra-console/upload-deployment-package.png)

   1. 選擇**上傳**，然後選擇您的`transfer_stream_python.zip`部署套件。然後選擇 **Save (儲存)**。

   1. <a name="lambda-console-runtime-settings-para"></a>在函數的**程式碼**索引標籤的**執行時間設定**下，選擇**編輯**，然後輸入下列值。
      + 針對**執行期**，選擇 **Python 3.7**。
      + 對於 **Handler (處理常式)**，輸入 **transfer\$1stream.function\$1handler**。

   1. <a name="lambda-console-save-config"></a>選擇**儲存**。
**注意**  
 AWS Lambda 主控台上的**測試**按鈕不適用於此函數。 AWS IoT Greengrass 核心 SDK 不包含在 AWS Lambda 主控台中獨立執行 Greengrass Lambda 函數所需的模組。這些模組 （例如 `greengrass_common`) 會在部署到您的 Greengrass 核心之後提供給函數。

1. 現在，發佈 Lambda 函數的第一個版本，並為[該版本建立別名](https://docs.aws.amazon.com/lambda/latest/dg/versioning-aliases.html)。
**注意**  
Greengrass 群組可以依別名 （建議） 或版本參考 Lambda 函數。使用別名可讓您更輕鬆地管理程式碼更新，因為您不必在更新函數程式碼時變更訂閱資料表或群組定義。反之，您只需將別名指向新的函數版本。

   1. <a name="shared-publish-function-version"></a>請從**操作**功能表中選擇**發行新版本**。

   1. <a name="shared-publish-function-version-description"></a>針對 **Version description (版本描述)**，輸入 **First version**，然後選擇 **Publish (發佈)**。

   1. 從 **TransferStream: 1** 組態頁面上的 **Actions (動作)** 選單中選擇 **Create alias (建立別名)**。

   1. 在**建立警示**頁面上使用下列值：
      + 對於**名稱**，輸入 **GG\$1TransferStream**。
      + 對於**版本**，選擇 **1**。
**注意**  
AWS IoT Greengrass 不支援 **\$1LATEST** 版本的 Lambda 別名。

   1. 選擇**建立**。

現在您已準備好將 Lambda 函數新增至 Greengrass 群組。

## 步驟 3：將 Lambda 函數新增至 Greengrass 群組
<a name="stream-manager-console-create-gg-function"></a>

在此步驟中，您將 Lambda 函數新增至 群組，然後設定其生命週期和環境變數。如需詳細資訊，請參閱[使用群組特定的組態控制 Greengrass Lambda 函數的執行](lambda-group-config.md)。

1. <a name="console-gg-groups"></a>在 AWS IoT 主控台導覽窗格的**管理**下，展開 **Greengrass 裝置**，然後選擇**群組 (V1)**。

1. <a name="group-choose-target-group"></a>選擇目標群組。

1. <a name="choose-add-lambda"></a>在群組組態頁面上，選擇 **Lambda 函數**索引標籤。

1. 在**我的 Lambda 函數**下，選擇**新增**。

1. 在**新增 Lambda 函數**頁面上，為您的 **Lambda 函數**選擇 Lambda 函數。

1. 針對 **Lambda 版本**，選擇**別名：GG\$1TransferStream**。

   現在，設定屬性來決定 Greengrass 群組中 Lambda 函數的行為。

1. 在 **Lambda 函數組態**區段中，進行下列變更：
   + 將 **Memory limit (記憶體限制)** 設為 32 MB。
   + 針對**固定**，選擇 **True**。
**注意**  
<a name="long-lived-lambda"></a>*長期 *（或*固定*) Lambda 函數會在啟動後自動 AWS IoT Greengrass 啟動，並在自己的容器中繼續執行。這與*隨需* Lambda 函數相反，該函數會在叫用時啟動，並在沒有剩餘的任務執行時停止。如需詳細資訊，請參閱[Greengrass Lambda 函數的生命週期組態](lambda-functions.md#lambda-lifecycle)。

1. 選擇**新增 Lambda 函數**。

## 步驟 4：啟用串流管理員
<a name="stream-manager-console-enable-stream-manager"></a>

在此步驟中，您將確定串流管理員已啟用。

1. 在群組組態頁面上，選擇 **Lambda 函數**索引標籤。

1. 在 **System Lambda 函數**下，選取**串流管理員**，然後檢查狀態。如果是已停用，請選擇 **Edit (編輯)**。然後，選擇 **Enable (啟用)** 並 **Save (儲存)**。您可以使用本教學課程的預設參數設定。如需詳細資訊，請參閱[設定 AWS IoT Greengrass 串流管理員](configure-stream-manager.md)。

**注意**  <a name="ggstreammanager-function-config-console"></a>
當您使用主控台啟用串流管理員並部署群組時，串流管理員的記憶體大小預設為 4194304 KB (4 GB)。我們建議您將記憶體大小設定為至少 128000 KB。

## 步驟 5：設定本機記錄
<a name="stream-manager-console-configure-logging"></a>

在此步驟中，您可以設定 群組中的 AWS IoT Greengrass 系統元件、使用者定義的 Lambda 函數和連接器，將日誌寫入核心裝置的檔案系統。您可以使用記錄檔針對可能遇到的任何問題進行疑難排解。如需詳細資訊，請參閱[使用 AWS IoT Greengrass 日誌監控](greengrass-logs-overview.md)。

1. <a name="shared-group-settings-local-logs-configuration"></a>在 **Local logs configuration (本機日誌組態)** 下，檢查是否已設定本機記錄。

1. <a name="shared-group-settings-local-logs-edit"></a>如果未針對 Greengrass 系統元件或使用者定義的 Lambda 函數設定日誌，請選擇**編輯**。

1. <a name="shared-group-settings-local-logs-event-source"></a>選擇**使用者 Lambda 函數日誌層級**和 **Greengrass 系統日誌層級**。

1. <a name="shared-group-settings-local-logs-save"></a>保留記錄層級和磁碟空間限制的預設值，然後選擇 **Save (儲存)**。

## 步驟 6：部署 Greengrass 群組
<a name="stream-manager-console-create-deployment"></a>

將群組部署到核心裝置。

1. <a name="shared-deploy-group-checkggc"></a>確定 AWS IoT Greengrass 核心正在執行。如果需要，請在您的 Raspberry Pi 終端機執行以下命令。

   1. 檢查精靈是否有在運作：

      ```
      ps aux | grep -E 'greengrass.*daemon'
      ```

      若輸出的 `root` 含有 `/greengrass/ggc/packages/ggc-version/bin/daemon` 項目，則精靈有在運作。
**注意**  
路徑中的版本取決於核心裝置上安裝的核心 AWS IoT Greengrass 軟體版本。

   1. 若要啟動協助程式：

      ```
      cd /greengrass/ggc/core/
      sudo ./greengrassd start
      ```

1. <a name="shared-deploy-group-deploy"></a>在群組組態頁面上，選擇**部署**。

1. <a name="shared-deploy-group-ipconfig"></a>

   1. 在 **Lambda 函數**索引標籤**的系統 Lambda 函數**區段下，選取 **IP 偵測器**，然後選擇**編輯**。

   1. 在**編輯 IP 偵測器設定**對話方塊中，選取**自動偵測和覆寫 MQTT 代理程式端點**。

   1. 選擇**儲存**。

      這可讓裝置自動取得核心的連接資訊，例如 IP 位址、DNS、連接埠編號。建議自動偵測，但 AWS IoT Greengrass 也支援手動指定的端點。只會在第一次部署群組時收到復原方法的提示。
**注意**  
如果出現提示，請授予許可來建立 [Greengrass 服務角色](service-role.md)，並將其與目前 AWS 帳戶 中的 建立關聯 AWS 區域。此角色允許 AWS IoT Greengrass 存取 AWS 服務中的資源。

      此**部署**頁面會顯示部署時間戳記、版本 ID 和狀態。完成後，部署顯示的狀態應為**已完成**。

      如需故障診斷協助，請參閱[故障診斷 AWS IoT Greengrass](gg-troubleshooting.md)。

## 步驟 7：測試應用程式
<a name="stream-manager-console-test-application"></a>

`TransferStream` Lambda 函數會產生模擬的裝置資料。它會將資料寫入串流管理員匯出至目標 Kinesis 資料串流的串流。

1. <a name="stream-manager-howto-test-open-kinesis-console"></a>在 Amazon Kinesis 主控台的 **Kinesis 資料串流**下，選擇 **MyKinesisStream**。
**注意**  
如果您在沒有目標 Kinesis 資料串流的情況下執行教學課程，[請檢查串流管理員的日誌檔案](stream-manager-cli.md#stream-manager-cli-logs) (`GGStreamManager`)。如果日誌檔案包含錯誤消息中的 `export stream MyKinesisStream doesn't exist`，則測試成功。此錯誤表示服務嘗試匯出至串流，但串流不存在。

1. <a name="stream-manager-howto-view-put-records"></a>在 **MyKinesisStream** 頁面上，選擇 **Monitoring (監控)**。如果測試成功，您應該會看到 **Put Records (Put 記錄)** 圖表中的資料。視您的連線而定，可能需要一分鐘才會顯示資料。
**重要**  
完成測試後，請刪除 Kinesis 資料串流以免產生更多費用。  
或者，執行下列命令來停止 Greengrass 協助程式。這樣可以防止核心傳送訊息，直到您準備好繼續測試為止。  

   ```
   cd /greengrass/ggc/core/
   sudo ./greengrassd stop
   ```

1. 從核心移除 **TransferStream** Lambda 函數。

   1. <a name="console-gg-groups"></a>在 AWS IoT 主控台導覽窗格的**管理**下，展開 **Greengrass 裝置**，然後選擇**群組 (V1)**。

   1. 在 **Greengrass 群組**下，選擇您的群組。

   1. 在 **Lambdas** 頁面上，選擇 **TransferStream** 函數的省略符號 (**...**)，然後選擇**移除函數**。

   1. 從 **Actions (動作)** 中，選擇 **Deploy (部署)**。

若要檢視記錄資訊或針對串流問題進行疑難排解，請檢查 `TransferStream` 和 `GGStreamManager` 函數的記錄。您必須具有讀取檔案系統 AWS IoT Greengrass 日誌的`root`許可。
+ `TransferStream` 會將日誌項目寫入 `greengrass-root/ggc/var/log/user/region/account-id/TransferStream.log`。
+ `GGStreamManager` 會將日誌項目寫入 `greengrass-root/ggc/var/log/system/GGStreamManager.log`。

如果您需要更多疑難排解資訊，您可以將 **User Lambda 日誌**的記錄[層級設定為](#stream-manager-console-configure-logging)**偵錯日誌**，然後再次部署群組。

## 另請參閱
<a name="stream-manager-console-see-also"></a>
+ [管理 AWS IoT Greengrass 核心上的資料串流](stream-manager.md)
+ [設定 AWS IoT Greengrass 串流管理員](configure-stream-manager.md)
+ [使用 StreamManagerClient 搭配串流](work-with-streams.md)
+ [匯出支援 AWS 雲端 目的地的組態](stream-export-configurations.md)
+ [將資料串流匯出至 AWS 雲端 (CLI)](stream-manager-cli.md)

# 將資料串流匯出至 AWS 雲端 (CLI)
<a name="stream-manager-cli"></a>

本教學課程說明如何使用 AWS CLI 來設定和部署已啟用串流管理員的 AWS IoT Greengrass 群組。群組包含使用者定義的 Lambda 函數，可寫入串流管理員中的串流，然後自動匯出至 AWS 雲端。

串流管理員可讓擷取、處理和匯出大量資料串流更有效率且更可靠。在本教學課程中，您會建立使用 IoT 資料的 `TransferStream` Lambda 函數。Lambda 函數使用 AWS IoT Greengrass Core SDK 在串流管理員中建立串流，然後讀取和寫入。串流管理員接著會將串流匯出至 Kinesis Data Streams。下圖顯示此工作流程。

![\[串流管理工作流程圖\]](http://docs.aws.amazon.com/zh_tw/greengrass/v1/developerguide/images/stream-manager-scenario.png)


本教學課程的重點是展示使用者定義的 Lambda 函數如何使用 AWS IoT Greengrass Core SDK 中的 `StreamManagerClient` 物件與串流管理員互動。為了簡化，您為此教學課程建立的 Python Lambda 函數會產生模擬的裝置資料。

當您使用包含 中 Greengrass 命令的 AWS IoT Greengrass API AWS CLI來建立群組時，預設會停用串流管理員。若要在核心上啟用串流管理員，您可以[建立包含系統 Lambda 函數的函數定義版本](#stream-manager-cli-create-function-definition)，以及參考新函數定義版本的群組版本。 `GGStreamManager`然後您將部署群組。

## 先決條件
<a name="stream-manager-cli-prerequisites"></a>

為完成此教學課程您需要：<a name="stream-manager-howto-prereqs"></a>
+ Greengrass 群組和 Greengrass 核心 (v1.10 或更新版本）。如需如何建立 Greengrass 群組和核心的詳細資訊，請參閱 [入門 AWS IoT Greengrass](gg-gs.md)。入門教學課程也包含安裝 AWS IoT Greengrass 核心軟體的步驟。
**注意**  <a name="stream-manager-not-supported-openwrt"></a>
<a name="stream-manager-not-supported-openwrt-para"></a>OpenWrt 發行版本不支援串流管理員。
+ 在核心裝置上安裝 Java 8 執行時間 (JDK 8)。<a name="install-java8-runtime-general"></a>
  + 針對以 Debian 為基礎的發行版本 (包括 Raspbian) 或以 Ubuntu 為基礎的發行版本，請執行下列命令：

    ```
    sudo apt install openjdk-8-jdk
    ```
  + 針對以 Red Hat 為基礎的發行版本 (包括 Amazon Linux)，請執行下列命令：

    ```
    sudo yum install java-1.8.0-openjdk
    ```

    如需詳細資訊，請參閱 OpenJDK 文件上的[如何下載和安裝預先建置的 OpenJDK 套件](https://openjdk.java.net/install/)。
+ AWS IoT Greengrass 適用於 Python 1.5.0 版或更新版本的核心 SDK。若要在適用於 Python 的 AWS IoT Greengrass 核心 SDK `StreamManagerClient`中使用 ，您必須：
  + 在核心裝置上安裝 Python 3.7 或更新版本。
  + 在 Lambda 函數部署套件中包含 SDK 及其相依性。本教學課程提供相關指示。
**提示**  
您可以搭配使用 `StreamManagerClient` 與 Java 或 NodeJS。如需範例程式碼，請參閱 GitHub 上的[AWS IoT Greengrass 適用於 Java](https://github.com/aws/aws-greengrass-core-sdk-java/blob/master/samples/StreamManagerKinesis/src/main/java/com/amazonaws/greengrass/examples/StreamManagerKinesis.java) 的 Core SDK 和[AWS IoT Greengrass 適用於 Node.js 的 Core SDK](https://github.com/aws/aws-greengrass-core-sdk-js/blob/master/greengrassExamples/StreamManagerKinesis/index.js)。
+ 在與 Greengrass 群組相同的 Amazon Kinesis Data Streams 中**MyKinesisStream**建立名為 AWS 區域 的目的地串流。如需詳細資訊，請參閱*《Amazon Kinesis 開發人員指南*》中的[建立串流](https://docs.aws.amazon.com/streams/latest/dev/fundamental-stream.html#create-stream)。
**注意**  
在本教學課程中，串流管理員會將資料匯出至 Kinesis Data Streams，這會向您的 收取費用 AWS 帳戶。如需定價的相關資訊，請參閱 [Kinesis Data Streams 定價](https://aws.amazon.com/kinesis/data-streams/pricing/)。  
若要避免產生費用，您可以執行本教學課程而不建立 Kinesis 資料串流。在此情況下，您會檢查日誌，以查看串流管理員嘗試將串流匯出至 Kinesis Data Streams。
+ 新增至 的 IAM 政策[Greengrass 群組角色](group-role.md)，允許對目標資料串流執行 `kinesis:PutRecords`動作，如下列範例所示：

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "kinesis:PutRecords"
              ],
              "Resource": [
              "arn:aws:kinesis:us-east-1:123456789012:stream/MyKinesisStream"
              ]
          }
      ]
  }
  ```

------<a name="aws-cli-howto-prereqs"></a>
+ 在您電腦上 AWS CLI 安裝和設定的 。如需詳細資訊，請參閱*AWS Command Line Interface 《 使用者指南*》中的[安裝 AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/installing.html) 和[設定 AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html)。

   

  此教學課程中的範例命令是針對 Linux 和其他以 Unix 為基礎的系統所撰寫。如果您使用的是 Windows，請參閱[指定 AWS 命令列界面的參數值](https://docs.aws.amazon.com/cli/latest/userguide/cli-using-param.html)，以取得語法差異的詳細資訊。

  如果命令包含 JSON 字串，教學課程提供的範例會在單行上有 JSON。在某些系統上，使用此格式編輯和執行命令可能更有效率。

 

本教學課程所述以下高階執行步驟：

1. [建立 Lambda 函數部署套件](#stream-manager-cli-create-deployment-package)

1. [建立 Lambda 函數](#stream-manager-cli-create-function)

1. [建立函數定義和版本](#stream-manager-cli-create-function-definition)

1. [建立記錄器定義和版本](#stream-manager-cli-create-logger-definition)

1. [取得您核心定義版本的 ARN](#stream-manager-cli-get-core-definition-version-arn)

1. [建立群組版本](#stream-manager-cli-create-group-version)

1. [建立部署](#stream-manager-cli-create-deployment)

1. [測試應用程式。](#stream-manager-cli-test-application)

此教學課程需約 30 分鐘完成。

## 步驟 1：建立 Lambda 函數部署套件
<a name="stream-manager-cli-create-deployment-package"></a>

在此步驟中，您會建立包含 Python 函數程式碼和相依性的 Lambda 函數部署套件。您稍後在其中建立 Lambda 函數時上傳此套件 AWS Lambda。Lambda 函數使用 AWS IoT Greengrass 核心 SDK 來建立本機串流並與之互動。

**注意**  
 使用者定義的 Lambda 函數必須使用 [AWS IoT Greengrass 核心 SDK](lambda-functions.md#lambda-sdks-core) 與串流管理員互動。如需有關 Greengrass 串流管理員需求的詳細資訊，請參閱 [Greengrass 串流管理員需求](stream-manager.md#stream-manager-requirements)。

1.  下載[AWS IoT Greengrass 適用於 Python 的 核心 SDK](lambda-functions.md#lambda-sdks-core) 1.5.0 版或更新版本。

1. <a name="unzip-ggc-sdk"></a>解壓縮下載的封裝，以取得軟體開發套件。SDK 為 `greengrasssdk` 資料夾。

1. <a name="install-python-sdk-dependencies-stream-manager"></a>安裝套件相依性，以在 Lambda 函數部署套件中包含 開發套件。<a name="python-sdk-dependencies-stream-manager"></a>

   1. 前往包含 `requirements.txt` 檔案的軟體開發套件目錄。這個檔案會列出相依性。

   1. 安裝軟體開發套件相依性。例如，執行下列 `pip` 命令，將它們安裝在目前的目錄中：

      ```
      pip install --target . -r requirements.txt
      ```

1. 將以下 Python 程式碼函數儲存在名為 `transfer_stream.py` 的本機檔案中。
**提示**  
 如需使用 Java 和 NodeJS 的範例程式碼，請參閱 GitHub 上的[AWS IoT Greengrass 適用於 Java 的 Core SDK](https://github.com/aws/aws-greengrass-core-sdk-java/blob/master/samples/StreamManagerKinesis/src/main/java/com/amazonaws/greengrass/examples/StreamManagerKinesis.java) 和[AWS IoT Greengrass 適用於 Node.js 的 Core SDK](https://github.com/aws/aws-greengrass-core-sdk-js/blob/master/greengrassExamples/StreamManagerKinesis/index.js)。

   ```
   import asyncio
   import logging
   import random
   import time
   
   from greengrasssdk.stream_manager import (
       ExportDefinition,
       KinesisConfig,
       MessageStreamDefinition,
       ReadMessagesOptions,
       ResourceNotFoundException,
       StrategyOnFull,
       StreamManagerClient,
   )
   
   
   # This example creates a local stream named "SomeStream".
   # It starts writing data into that stream and then stream manager automatically exports  
   # the data to a customer-created Kinesis data stream named "MyKinesisStream". 
   # This example runs forever until the program is stopped.
   
   # The size of the local stream on disk will not exceed the default (which is 256 MB).
   # Any data appended after the stream reaches the size limit continues to be appended, and
   # stream manager deletes the oldest data until the total stream size is back under 256 MB.
   # The Kinesis data stream in the cloud has no such bound, so all the data from this script is
   # uploaded to Kinesis and you will be charged for that usage.
   
   
   def main(logger):
       try:
           stream_name = "SomeStream"
           kinesis_stream_name = "MyKinesisStream"
   
           # Create a client for the StreamManager
           client = StreamManagerClient()
   
           # Try deleting the stream (if it exists) so that we have a fresh start
           try:
               client.delete_message_stream(stream_name=stream_name)
           except ResourceNotFoundException:
               pass
   
           exports = ExportDefinition(
               kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)]
           )
           client.create_message_stream(
               MessageStreamDefinition(
                   name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports
               )
           )
   
           # Append two messages and print their sequence numbers
           logger.info(
               "Successfully appended message to stream with sequence number %d",
               client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")),
           )
           logger.info(
               "Successfully appended message to stream with sequence number %d",
               client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")),
           )
   
           # Try reading the two messages we just appended and print them out
           logger.info(
               "Successfully read 2 messages: %s",
               client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)),
           )
   
           logger.info("Now going to start writing random integers between 0 and 1000 to the stream")
           # Now start putting in random data between 0 and 1000 to emulate device sensor input
           while True:
               logger.debug("Appending new random integer to stream")
               client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big"))
               time.sleep(1)
   
       except asyncio.TimeoutError:
           logger.exception("Timed out while executing")
       except Exception:
           logger.exception("Exception while running")
   
   
   def function_handler(event, context):
       return
   
   
   logging.basicConfig(level=logging.INFO)
   # Start up this sample code
   main(logger=logging.getLogger())
   ```

1. 將下列項目壓縮成名為 `transfer_stream_python.zip` 的檔案。這是您的 Lambda 函數部署套件。
   + **transfer\$1stream.py**。應用程式邏輯。
   + **greengrasssdk**。發佈 MQTT 訊息的 Python Greengrass Lambda 函數所需的程式庫。

     適用於 Python 的 AWS IoT Greengrass Core SDK 1.5.0 版或更新版本提供[串流管理員操作](work-with-streams.md)。
   + 您為適用於 Python AWS IoT Greengrass 的核心 SDK 安裝的相依性 （例如， `cbor2`目錄）。

   當您建立 `zip` 檔案時，請只包含這些項目，而非包含的資料夾。

## 步驟 2：建立 Lambda 函數
<a name="stream-manager-cli-create-function"></a>

1. <a name="cli-create-empty-lambda-role"></a>建立 IAM 角色，以便在建立函數時傳入角色 ARN。

------
#### [ JSON Expanded ]

   ```
   aws iam create-role --role-name Lambda_empty --assume-role-policy '{
       "Version": "2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Principal": {
                   "Service": "lambda.amazonaws.com"
               },
              "Action": "sts:AssumeRole"
           }
       ]
   }'
   ```

------
#### [ JSON Single-line ]

   ```
   aws iam create-role --role-name Lambda_empty --assume-role-policy '{"Version": "2012-10-17",		 	 	  "Statement": [{"Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com"},"Action": "sts:AssumeRole"}]}'
   ```

------
**注意**  
AWS IoT Greengrass 不會使用此角色，因為 Greengrass Lambda 函數的許可是在 Greengrass 群組角色中指定。本教學課程將建立空白角色。

1. <a name="cli-copy-lambda-role-arn"></a>從輸出複製 `Arn`。

1. 使用 AWS Lambda API 建立 `TransferStream`函數。以下命令假設 zip 檔是在目前的目錄。
   + 將 *role-arn* 取代為您複製`Arn`的 。

   ```
   aws lambda create-function \
   --function-name TransferStream \
   --zip-file fileb://transfer_stream_python.zip \
   --role role-arn \
   --handler transfer_stream.function_handler \
   --runtime python3.7
   ```

1. 發佈函數的一個版本。

   ```
   aws lambda publish-version --function-name TransferStream --description 'First version'
   ```

1. 建立發佈版本的別名。

   Greengrass 群組可以依別名 （建議） 或版本參考 Lambda 函數。使用別名可讓您更輕鬆地管理程式碼更新，因為您不必在更新函數程式碼時變更訂閱資料表或群組定義。反之，您只需將別名指向新的函數版本。

   ```
   aws lambda create-alias --function-name TransferStream --name GG_TransferStream --function-version 1
   ```
**注意**  
AWS IoT Greengrass 不支援 **\$1LATEST** 版本的 Lambda 別名。

1. 從輸出複製 `AliasArn`。您在設定 函數時使用此值 AWS IoT Greengrass。

現在您已準備好設定 的 函數 AWS IoT Greengrass。

## 步驟 3：建立函數定義和版本
<a name="stream-manager-cli-create-function-definition"></a>

此步驟會建立參考系統 `GGStreamManager` Lambda 函數和使用者定義 `TransferStream` Lambda 函數的函數定義版本。若要在使用 AWS IoT Greengrass API 時啟用串流管理員，您的函數定義版本必須包含 `GGStreamManager`函數。

1. 使用包含系統和使用者定義 Lambda 函數的初始版本來建立函數定義。

   下列定義版本使用預設[參數設定](configure-stream-manager.md)啟用串流管理員。若要設定自訂設定，您必須定義對應串流管理員參數的環境變數。如需範例，請參閱 [若要啟用、停用或設定串流管理員 (CLI)](configure-stream-manager.md#enable-stream-manager-cli)。針對省略的參數 AWS IoT Greengrass 使用預設設定。 `MemorySize` 應至少為 `128000`。 `Pinned` 必須設定為 `true`。
**注意**  
<a name="long-lived-lambda"></a>*長期 *（或*固定*) Lambda 函數會在啟動後自動 AWS IoT Greengrass 啟動，並在自己的容器中繼續執行。這與*隨需* Lambda 函數相反，該函數會在叫用時啟動，並在沒有剩餘的任務執行時停止。如需詳細資訊，請參閱[Greengrass Lambda 函數的生命週期組態](lambda-functions.md#lambda-lifecycle)。
   + 將 *arbitrary-function-id* 取代為函數的名稱，例如 **stream-manager**。
   + 將 *alias-arn* 取代為您為 `TransferStream` Lambda 函數建立別名時`AliasArn`複製的 。

    

------
#### [ JSON expanded ]

   ```
   aws greengrass create-function-definition --name MyGreengrassFunctions --initial-version '{
       "Functions": [
           {
               "Id": "arbitrary-function-id",
               "FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1", 
               "FunctionConfiguration": {
                   "MemorySize": 128000,
                   "Pinned": true,
                   "Timeout": 3
               }
           },
           {
               "Id": "TransferStreamFunction",
               "FunctionArn": "alias-arn",
               "FunctionConfiguration": {
                   "Executable": "transfer_stream.function_handler",
                   "MemorySize": 16000,
                   "Pinned": true,
                   "Timeout": 5
               }
           }
       ]
   }'
   ```

------
#### [ JSON single ]

   ```
   aws greengrass create-function-definition \
   --name MyGreengrassFunctions \
   --initial-version '{"Functions": [{"Id": "arbitrary-function-id","FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1", "FunctionConfiguration": {"Environment": {"Variables":{"STREAM_MANAGER_STORE_ROOT_DIR": "/data","STREAM_MANAGER_SERVER_PORT": "1234","STREAM_MANAGER_EXPORTER_MAX_BANDWIDTH": "20000"}},"MemorySize": 128000,"Pinned": true,"Timeout": 3}},{"Id": "TransferStreamFunction", "FunctionArn": "alias-arn", "FunctionConfiguration": {"Executable": "transfer_stream.function_handler", "MemorySize": 16000,"Pinned": true,"Timeout": 5}}]}'
   ```

------
**注意**  
函數定義版本需要 `Timeout`，但 `GGStreamManager` 不會使用它。如需 `Timeout` 和其他群組層級設定的詳細資訊，請參閱 [使用群組特定的組態控制 Greengrass Lambda 函數的執行](lambda-group-config.md)。

1. 從輸出複製 `LatestVersionArn`。您可以使用這個值，將函數定義版本新增至您部署到核心的群組版本。

## 步驟 4：建立一個記錄器定義和版本
<a name="stream-manager-cli-create-logger-definition"></a>

設定群組的記錄設定。在本教學課程中，您會設定 AWS IoT Greengrass 系統元件、使用者定義的 Lambda 函數和連接器，將日誌寫入核心裝置的檔案系統。您可以使用記錄檔針對可能遇到的任何問題進行疑難排解。如需詳細資訊，請參閱[使用 AWS IoT Greengrass 日誌監控](greengrass-logs-overview.md)。

1. <a name="create-logger-definition"></a>建立包含最初版本的記錄器定義。

------
#### [ JSON Expanded ]

   ```
   aws greengrass create-logger-definition --name "LoggingConfigs" --initial-version '{
       "Loggers": [
           {
               "Id": "1",
               "Component": "GreengrassSystem",
               "Level": "INFO",
               "Space": 10240,
               "Type": "FileSystem"
           },
           {
               "Id": "2",
               "Component": "Lambda",
               "Level": "INFO",
               "Space": 10240,
               "Type": "FileSystem"
           }
       ]
   }'
   ```

------
#### [ JSON Single-line ]

   ```
   aws greengrass create-logger-definition \
       --name "LoggingConfigs" \
       --initial-version '{"Loggers":[{"Id":"1","Component":"GreengrassSystem","Level":"INFO","Space":10240,"Type":"FileSystem"},{"Id":"2","Component":"Lambda","Level":"INFO","Space":10240,"Type":"FileSystem"}]}'
   ```

------

1. <a name="copy-logger-definition-version-id"></a>從輸出複製記錄器定義的 `LatestVersionArn`。您可以使用這個值，將紀錄器定義版本新增至您部署到核心的群組版本。

## 步驟 5：取得您核心定義版本的 ARN
<a name="stream-manager-cli-get-core-definition-version-arn"></a>

取得核心定義版本的 ARN，以新增至新群組版本。若要部署群組版本，其必須參考包含一個核心的核心定義版本。

1. <a name="get-group-id-latestversion"></a>取得目標 Greengrass 群組 ID 和目標群組版本 ID。此程序假設這是最新的群組和群組版本。下列查詢會傳回最近建立的群組。

   ```
   aws greengrass list-groups --query "reverse(sort_by(Groups, &CreationTimestamp))[0]"
   ```

   或者，您可以依名稱查詢。群組名稱不需要是唯一名稱，因此可能會傳回多個群組。

   ```
   aws greengrass list-groups --query "Groups[?Name=='MyGroup']"
   ```
**注意**  
<a name="find-group-ids-console"></a>您也可以在 AWS IoT 主控台中找到這些值。群組 ID 會顯示在群組的 **Settings (設定)** 頁面上。群組版本 IDs會顯示在群組的**部署**索引標籤上。

1. <a name="copy-target-group-id"></a>從輸出複製目標群組的 `Id`。部署群組時，您可以使用此 ID 取得核心定義版本。

1. <a name="copy-latest-group-version-id"></a>從輸出複製 `LatestVersion`，這是新增至群組的最新版本 ID。您可以使用此取得核心定義版本。

1. 取得核心定義版本的 ARN：

   1. 取得群組版本。
      + 將 *group-id* 取代為您為群組所複製的 `Id`。
      + 將 *group-version-id* 換成您為群組所複製的 `LatestVersion`。

      ```
      aws greengrass get-group-version \
      --group-id group-id \
      --group-version-id group-version-id
      ```

   1. 從輸出複製 `CoreDefinitionVersionArn`。您可以使用這個值，將核心定義版本新增至您部署到核心的群組版本。

## 步驟 6：建立群組版本
<a name="stream-manager-cli-create-group-version"></a>

現在，您可以開始建立群組版本，其中包含您要部署的實體。若要這樣做，您需要建立群組版本來參考每個元件類型的目標版本。在本教學課程中，您會包含核心定義版本、函數定義版本和記錄器定義版本。

1. 建立群組版本。
   + 將 *group-id* 取代為您為群組所複製的 `Id`。
   + 將 *core-definition-version-arn* 換成您為核心定義版本所複製的 `CoreDefinitionVersionArn`。
   + 將 *function-definition-version-arn* 取代`LatestVersionArn`為您為新函數定義版本複製的 。
   + 將 *logger-definition-version-arn* 取代`LatestVersionArn`為您為新記錄器定義版本複製的 。

   ```
   aws greengrass create-group-version \
   --group-id group-id \
   --core-definition-version-arn core-definition-version-arn \
   --function-definition-version-arn function-definition-version-arn \
   --logger-definition-version-arn logger-definition-version-arn
   ```

1. <a name="copy-group-version-id"></a>從輸出複製 `Version`。這是新群組版本的 ID。

## 步驟 7：建立部署
<a name="stream-manager-cli-create-deployment"></a>

將群組部署到核心裝置。

1. <a name="shared-deploy-group-checkggc"></a>確定 AWS IoT Greengrass 核心正在執行。如果需要，請在您的 Raspberry Pi 終端機執行以下命令。

   1. 檢查精靈是否有在運作：

      ```
      ps aux | grep -E 'greengrass.*daemon'
      ```

      若輸出的 `root` 含有 `/greengrass/ggc/packages/ggc-version/bin/daemon` 項目，則精靈有在運作。
**注意**  
路徑中的版本取決於核心裝置上安裝的核心 AWS IoT Greengrass 軟體版本。

   1. 若要啟動協助程式：

      ```
      cd /greengrass/ggc/core/
      sudo ./greengrassd start
      ```

1. <a name="create-deployment"></a>建立 部署。
   + 將 *group-id* 取代為您為群組所複製的 `Id`。
   + 將 *group-version-id* 取代為您為新群組版本所複製的 `Version`。

   ```
   aws greengrass create-deployment \
   --deployment-type NewDeployment \
   --group-id group-id \
   --group-version-id group-version-id
   ```

1. <a name="copy-deployment-id"></a>從輸出複製 `DeploymentId`。

1. <a name="get-deployment-status"></a>取得部署狀態。
   + 將 *group-id* 取代為您為群組所複製的 `Id`。
   + 將 *deployment-id* 換成您為部署所複製的 `DeploymentId`。

   ```
   aws greengrass get-deployment-status \
   --group-id group-id \
   --deployment-id deployment-id
   ```

   如果狀態為 `Success`，則部署成功。如需故障診斷協助，請參閱[故障診斷 AWS IoT Greengrass](gg-troubleshooting.md)。

## 步驟 8：測試應用程式
<a name="stream-manager-cli-test-application"></a>

`TransferStream` Lambda 函數會產生模擬的裝置資料。它會將資料寫入串流管理員匯出至目標 Kinesis 資料串流的串流。

1. <a name="stream-manager-howto-test-open-kinesis-console"></a>在 Amazon Kinesis 主控台的 **Kinesis 資料串流**下，選擇 **MyKinesisStream**。
**注意**  
如果您在沒有目標 Kinesis 資料串流的情況下執行教學課程，[請檢查串流管理員的日誌檔案](#stream-manager-cli-logs) (`GGStreamManager`)。如果日誌檔案包含錯誤消息中的 `export stream MyKinesisStream doesn't exist`，則測試成功。此錯誤表示服務嘗試匯出至串流，但串流不存在。

1. <a name="stream-manager-howto-view-put-records"></a>在 **MyKinesisStream** 頁面上，選擇 **Monitoring (監控)**。如果測試成功，您應該會看到 **Put Records (Put 記錄)** 圖表中的資料。視您的連線而定，可能需要一分鐘才會顯示資料。
**重要**  
完成測試後，請刪除 Kinesis 資料串流以免產生更多費用。  
或者，執行下列命令來停止 Greengrass 協助程式。這樣可以防止核心傳送訊息，直到您準備好繼續測試為止。  

   ```
   cd /greengrass/ggc/core/
   sudo ./greengrassd stop
   ```

1. 從核心移除 **TransferStream** Lambda 函數。

   1. 按照 [步驟 6：建立群組版本](#stream-manager-cli-create-group-version) 以建立新群組版本。但移除 `create-group-version` 命令中的 `--function-definition-version-arn` 選項。或者，建立不包含 **TransferStream** Lambda 函數的函數定義版本。
**注意**  
透過從部署的群組版本省略系統 `GGStreamManager` Lambda 函數，您可以停用核心上的串流管理。

   1. 請依照 [步驟 7：建立部署](#stream-manager-cli-create-deployment) 以部署新的群組版本。

若要檢視記錄資訊或針對串流問題進行疑難排解，請檢查 `TransferStream` 和 `GGStreamManager` 函數的記錄。您必須具有讀取檔案系統 AWS IoT Greengrass 日誌的`root`許可。
+ `TransferStream` 會將日誌項目寫入 `greengrass-root/ggc/var/log/user/region/account-id/TransferStream.log`。
+ `GGStreamManager` 會將日誌項目寫入 `greengrass-root/ggc/var/log/system/GGStreamManager.log`。

如果您需要更多疑難排解資訊，可以將 `Lambda` 記錄等級設定為 `DEBUG`，然後建立並部署新的群組版本。

## 另請參閱
<a name="stream-manager-cli-see-also"></a>
+ [管理 AWS IoT Greengrass 核心上的資料串流](stream-manager.md)
+ [使用 StreamManagerClient 搭配串流](work-with-streams.md)
+ [匯出支援 AWS 雲端 目的地的組態](stream-export-configurations.md)
+ [設定 AWS IoT Greengrass 串流管理員](configure-stream-manager.md)
+ [將資料串流匯出至 AWS 雲端 （主控台）](stream-manager-console.md)
+ <a name="see-also-iam-cli"></a>《 [AWS Identity and Access Management 命令參考》中的 (IAM)](https://docs.aws.amazon.com/cli/latest/reference/iam) *AWS CLI 命令*
+ <a name="see-also-lambda-cli"></a>[AWS Lambda 命令](https://docs.aws.amazon.com/cli/latest/reference/lambda)*AWS CLI 參考中的 命令*
+ <a name="see-also-gg-cli"></a>[AWS IoT Greengrass 命令](https://docs.aws.amazon.com/cli/latest/reference/greengrass/index.html)*AWS CLI 參考中的 命令*