

End of support notice: On October 7th, 2026, AWS will discontinue support for AWS IoT Greengrass Version 1. After October 7th, 2026, you will no longer be able to access the AWS IoT Greengrass V1 resources. For more information, please visit [Migrate from AWS IoT Greengrass Version 1](https://docs.aws.amazon.com/greengrass/v2/developerguide/migrate-from-v1.html).

# Manage data streams on the AWS IoT Greengrass core
<a name="stream-manager"></a>

AWS IoT Greengrass stream manager makes it easier and more reliable to transfer high-volume IoT data to the AWS Cloud. Stream manager processes data streams locally and exports them to the AWS Cloud automatically. This feature integrates with common edge scenarios, such as machine learning (ML) inference, where data is processed and analyzed locally before being exported to the AWS Cloud or local storage destinations.

Stream manager simplifies application development. Your IoT applications can use a standardized mechanism to process high-volume streams and manage local data retention policies instead of building custom stream management functionality. IoT applications can read and write to streams. They can define policies for storage type, size, and data retention on a per-stream basis to control how stream manager processes and exports streams.

Stream manager is designed to work in environments with intermittent or limited connectivity. You can define bandwidth use, timeout behavior, and how stream data is handled when the core is connected or disconnected. For critical data, you can set priorities to control the order in which streams are exported to the AWS Cloud.

You can configure automatic exports to the AWS Cloud for storage or further processing and analysis. Stream manager supports exporting to the following AWS Cloud destinations.<a name="supported-export-destinations"></a>
+ Channels in AWS IoT Analytics. <a name="ita-export-destination"></a>AWS IoT Analytics lets you perform advanced analysis on your data to help make business decisions and improve machine learning models. For more information, see [What is AWS IoT Analytics?](https://docs.aws.amazon.com/iotanalytics/latest/userguide/welcome.html) in the *AWS IoT Analytics User Guide*.
+ Streams in Kinesis Data Streams. <a name="aks-export-destination"></a>Kinesis Data Streams is commonly used to aggregate high-volume data and load it into a data warehouse or map-reduce cluster. For more information, see [What is Amazon Kinesis Data Streams?](https://docs.aws.amazon.com/streams/latest/dev/what-is-this-service.html) in the *Amazon Kinesis Developer Guide*.
+ Asset properties in AWS IoT SiteWise. <a name="itsw-export-destination"></a>AWS IoT SiteWise lets you collect, organize, and analyze data from industrial equipment at scale. For more information, see [What is AWS IoT SiteWise?](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/what-is-sitewise.html) in the *AWS IoT SiteWise User Guide*.
+ Objects in Amazon S3. <a name="s3-export-destination"></a>You can use Amazon S3 to store and retrieve large amounts of data. For more information, see [What is Amazon S3?](https://docs.aws.amazon.com/AmazonS3/latest/dev/Welcome.html) in the *Amazon Simple Storage Service Developer Guide*.

## Stream management workflow
<a name="stream-manager-workflow"></a>

Your IoT applications interact with stream manager through the AWS IoT Greengrass Core SDK. In a simple workflow, a user-defined Lambda function running on the Greengrass core consumes IoT data, such as time-series temperature and pressure metrics. The Lambda function might filter or compress the data and then call the AWS IoT Greengrass Core SDK to write the data to a stream in stream manager. Stream manager can export the stream to the AWS Cloud automatically, based on the policies defined for the stream. User-defined Lambda functions can also send data directly to local databases or storage repositories.

Your IoT applications can include multiple user-defined Lambda functions that read or write to streams. These local Lambda functions can read and write to streams to filter, aggregate, and analyze data locally. This makes it possible to respond quickly to local events and extract valuable information before the data is transferred from the core to cloud or local destinations.

An example workflow is shown in the following diagram.

![\[Diagram of the stream manager workflow.\]](http://docs.aws.amazon.com/greengrass/v1/developerguide/images/stream-manager-architecture.png)


To use stream manager, start by configuring stream manager parameters to define group-level runtime settings that apply to all streams on the Greengrass core. These customizable settings allow you to control how stream manager stores, processes, and exports streams based on your business need and environment constraints. For more information, see [Configure AWS IoT Greengrass stream manager](configure-stream-manager.md).

After you configure stream manager, you can create and deploy your IoT applications. These are typically user-defined Lambda functions that use `StreamManagerClient` in the AWS IoT Greengrass Core SDK to create and interact with streams. During stream creation, the Lambda function defines per-stream policies, such as export destinations, priority, and persistence. For more information, including code snippets for `StreamManagerClient` operations, see [Use StreamManagerClient to work with streams](work-with-streams.md).

For tutorials that configure a simple workflow, see [Export data streams to the AWS Cloud (console)](stream-manager-console.md) or [Export data streams to the AWS Cloud (CLI)](stream-manager-cli.md).

## Requirements
<a name="stream-manager-requirements"></a>

The following requirements apply for using stream manager:
+ You must use AWS IoT Greengrass Core software v1.10 or later, with stream manager enabled. For more information, see [Configure AWS IoT Greengrass stream manager](configure-stream-manager.md).

  <a name="stream-manager-not-supported-openwrt-para"></a>Stream manager is not supported on OpenWrt distributions.
+ The Java 8 runtime (JDK 8) must be installed on the core.<a name="install-java8-runtime-general"></a>
  + For Debian-based distributions (including Raspbian) or Ubuntu-based distributions, run the following command:

    ```
    sudo apt install openjdk-8-jdk
    ```
  + For Red Hat-based distributions (including Amazon Linux), run the following command:

    ```
    sudo yum install java-1.8.0-openjdk
    ```

    For more information, see [ How to download and install prebuilt OpenJDK packages](https://openjdk.java.net/install/) in the OpenJDK documentation.

   
+ Stream manager requires a minimum of 70 MB RAM in addition to your base AWS IoT Greengrass Core software. Your total memory requirement depends on your workload.

   
+ User-defined Lambda functions must use the [AWS IoT Greengrass Core SDK](lambda-functions.md#lambda-sdks-core) to interact with stream manager. The AWS IoT Greengrass Core SDK is available in several languages, but only the following versions support stream manager operations:<a name="streammanagerclient-sdk-versions"></a>
  + Java SDK (v1.4.0 or later)
  + Python SDK (v1.5.0 or later)
  + Node.js SDK (v1.6.0 or later)

  Download the version of the SDK that corresponds to your Lambda function runtime and include it in your Lambda function deployment package.
**Note**  
The AWS IoT Greengrass Core SDK for Python requires Python 3.7 or later and has other package dependencies. For more information, see [Create a Lambda function deployment package (console)](stream-manager-console.md#stream-manager-console-create-deployment-package) or [Create a Lambda function deployment package (CLI)](stream-manager-cli.md#stream-manager-cli-create-deployment-package).
+ If you define AWS Cloud export destinations for a stream, you must create your export targets and grant access permissions in the Greengrass group role. Depending on the destination, other requirements might also apply. For more information, see:<a name="export-destinations-links"></a>
  + [AWS IoT Analytics channels](stream-export-configurations.md#export-to-iot-analytics)
  + [Amazon Kinesis data streams](stream-export-configurations.md#export-to-kinesis)
  + [AWS IoT SiteWise asset properties](stream-export-configurations.md#export-to-iot-sitewise)
  + [Amazon S3 objects](stream-export-configurations.md#export-to-s3)

  You are responsible for maintaining these AWS Cloud resources.

## Data security
<a name="stream-manager-security"></a>

When you use stream manager, be aware of the following security considerations.

### Local data security
<a name="stream-manager-security-stream-data"></a>

AWS IoT Greengrass does not encrypt stream data at rest or in transit locally between components on the core device.
+ **Data at rest**. Stream data is stored locally in a storage directory on the Greengrass core. For data security, AWS IoT Greengrass relies on Unix file permissions and full-disk encryption, if enabled. You can use the optional [STREAM\$1MANAGER\$1STORE\$1ROOT\$1DIR](configure-stream-manager.md#STREAM_MANAGER_STORE_ROOT_DIR) parameter to specify the storage directory. If you change this parameter later to use a different storage directory, AWS IoT Greengrass does not delete the previous storage directory or its contents.

   
+ **Data in transit locally**. AWS IoT Greengrass does not encrypt stream data in local transit on the core between data sources, Lambda functions, the AWS IoT Greengrass Core SDK, and stream manager.

   
+ **Data in transit to the AWS Cloud**. Data streams exported by stream manager to the AWS Cloud use standard AWS service client encryption with Transport Layer Security (TLS).

For more information, see [Data encryption](data-encryption.md).

### Client authentication
<a name="stream-manager-security-client-authentication"></a>

Stream manager clients use the AWS IoT Greengrass Core SDK to communicate with stream manager. When client authentication is enabled, only Lambda functions in the Greengrass group can interact with streams in stream manager. When client authentication is disabled, any process running on the Greengrass core (such as [Docker containers](docker-app-connector.md)) can interact with streams in stream manager. You should disable authentication only if your business case requires it.

You use the [STREAM\$1MANAGER\$1AUTHENTICATE\$1CLIENT](configure-stream-manager.md#STREAM_MANAGER_AUTHENTICATE_CLIENT) parameter to set the client authentication mode. You can configure this parameter from the console or AWS IoT Greengrass API. Changes take effect after the group is deployed.


****  

|   | Enabled | Disabled | 
| --- | --- | --- | 
| Parameter value | `true` (default and recommended) | `false` | 
| Allowed clients | User-defined Lambda functions in the Greengrass group | User-defined Lambda functions in the Greengrass group Other processes running on the Greengrass core device | 

## See also
<a name="stream-manager-see-also"></a>
+ [Configure AWS IoT Greengrass stream manager](configure-stream-manager.md)
+ [Use StreamManagerClient to work with streams](work-with-streams.md)
+ [Export configurations for supported AWS Cloud destinations](stream-export-configurations.md)
+ [Export data streams to the AWS Cloud (console)](stream-manager-console.md)
+ [Export data streams to the AWS Cloud (CLI)](stream-manager-cli.md)

# Configure AWS IoT Greengrass stream manager
<a name="configure-stream-manager"></a>

On the AWS IoT Greengrass core, stream manager can store, process, and export IoT device data. Stream manager provides parameters that you use to configure group-level runtime settings. These settings apply to all streams on the Greengrass core. You can use the AWS IoT console or AWS IoT Greengrass API to configure stream manager settings. Changes take effect after the group is deployed.

**Note**  
After you configure stream manager, you can create and deploy IoT applications that run on the Greengrass core and interact with stream manager. These IoT applications are typically user-defined Lambda functions. For more information, see [Use StreamManagerClient to work with streams](work-with-streams.md).

## Stream manager parameters
<a name="stream-manager-parameters"></a>

Stream manager provides the following parameters that allow you to define group-level settings. All parameters are optional.

**Storage directory**  <a name="STREAM_MANAGER_STORE_ROOT_DIR"></a>
Parameter name: `STREAM_MANAGER_STORE_ROOT_DIR`  
The absolute path of the local directory used to store streams. This value must start with a forward slash (for example, `/data`).  
For information about securing stream data, see [Local data security](stream-manager.md#stream-manager-security-stream-data).  
Minimum AWS IoT Greengrass Core version: 1.10.0

**Server port**  
Parameter name: `STREAM_MANAGER_SERVER_PORT`  
The local port number used to communicate with stream manager. The default is `8088`.  
Minimum AWS IoT Greengrass Core version: 1.10.0

**Authenticate client**  <a name="STREAM_MANAGER_AUTHENTICATE_CLIENT"></a>
Parameter name: `STREAM_MANAGER_AUTHENTICATE_CLIENT`  
Indicates whether clients must be authenticated to interact with stream manager. All interaction between clients and stream manager is controlled by the AWS IoT Greengrass Core SDK. This parameter determines which clients can call the AWS IoT Greengrass Core SDK to work with streams. For more information, see [Client authentication](stream-manager.md#stream-manager-security-client-authentication).  
Valid values are `true` or `false`. The default is `true` (recommended).  
+ `true`. Allows only Greengrass Lambda functions as clients. Lambda function clients use internal AWS IoT Greengrass core protocols to authenticate with the AWS IoT Greengrass Core SDK.
+ `false`. Allows any process that runs on the AWS IoT Greengrass core to be a client. Do not set to `false` unless your business case requires it. For example, set this value to `false` only if non-Lambda processes on the core device must communicate directly with stream manager, such as [Docker containers](docker-app-connector.md) running on the core.
Minimum AWS IoT Greengrass Core version: 1.10.0

**Maximum bandwidth**  
Parameter name: `STREAM_MANAGER_EXPORTER_MAX_BANDWIDTH`  
The average maximum bandwidth (in kilobits per second) that can be used to export data. The default allows unlimited use of available bandwidth.  
Minimum AWS IoT Greengrass Core version: 1.10.0

**Thread pool size**  
Parameter name: `STREAM_MANAGER_EXPORTER_THREAD_POOL_SIZE`  
The maximum number of active threads that can be used to export data. The default is `5`.  
The optimal size depends on your hardware, stream volume, and planned number of export streams. If your export speed is slow, you can adjust this setting to find the optimal size for your hardware and business case. The CPU and memory of your core device hardware are limiting factors. To start, you might try setting this value equal to the number of processor cores on the device.  
Be careful not to set a size that's higher than your hardware can support. Each stream consumes hardware resources, so you should try to limit the number of export streams on constrained devices.  
Minimum AWS IoT Greengrass Core version: 1.10.0

**JVM arguments**  
Parameter name: `JVM_ARGS`  
Custom Java Virtual Machine arguments to pass to stream manager at startup. Multiple arguments should be separated by spaces.  
Use this parameter only when you must override the default settings used by the JVM. For example, you might need to increase the default heap size if you plan to export a large number of streams.  
Minimum AWS IoT Greengrass Core version: 1.10.0

**Read-only input file directories**  <a name="stream-manager-read-only-directories"></a>
Parameter name: `STREAM_MANAGER_READ_ONLY_DIRS`  
A comma-separated list of absolute paths to the directories outside of the root file system that store input files. Stream manager reads and uploads the files to Amazon S3 and mounts the directories as read-only. For more information about exporting to Amazon S3, see [Amazon S3 objects](stream-export-configurations.md#export-to-s3).  
Use this parameter only if the following conditions are true:  
+ The input file directory for a stream that exports to Amazon S3 is in one of the following locations:
  + A partition other than the root file system.
  + Under `/tmp` on the root file system.
+ The [default containerization](lambda-group-config.md#lambda-containerization-groupsettings) of the Greengrass group is **Greengrass container**.
Example value: `/mnt/directory-1,/mnt/directory-2,/tmp`  
Minimum AWS IoT Greengrass Core version: 1.11.0

**Minimum size for multipart upload**  <a name="stream-manager-minimum-part-size"></a>
Parameter name: `STREAM_MANAGER_EXPORTER_S3_DESTINATION_MULTIPART_UPLOAD_MIN_PART_SIZE_BYTES`  
The minimum size (in bytes) of a part in a multipart upload to Amazon S3. Stream manager uses this setting and the size of the input file to determine how to batch data in a multipart PUT request. The default and minimum value is `5242880` bytes (5 MB).  
Stream manager uses the stream's `sizeThresholdForMultipartUploadBytes` property to determine whether to export to Amazon S3 as a single or multipart upload. User-defined Lambda functions set this threshold when they create a stream that exports to Amazon S3. The default threshold is 5 MB.
Minimum AWS IoT Greengrass Core version: 1.11.0

## Configure stream manager settings (console)
<a name="configure-stream-manager-console"></a>

You can use the AWS IoT console for the following management tasks:
+ [Check if stream manager is enabled](#check-stream-manager-console)
+ [Enable or disable stream manager during group creation](#enable-stream-manager-console-new-group)
+ [Enable or disable stream manager for an existing group](#enable-stream-manager-console-existing-group)
+ [Change stream manager settings](#change-stream-manager-console)

Changes take effect after the Greengrass group is deployed. For a tutorial that shows how to deploy a Greengrass group that contains a Lambda function that interacts with stream manager, see [Export data streams to the AWS Cloud (console)](stream-manager-console.md).

**Note**  <a name="ggstreammanager-function-config-console"></a>
When you use the console to enable stream manager and deploy the group, the memory size for stream manager is set to 4194304 KB (4 GB) by default. We recommend that you set the memory size to at least 128000 KB.

 

### To check if stream manager is enabled (console)
<a name="check-stream-manager-console"></a>

1. <a name="console-gg-groups"></a>In the AWS IoT console navigation pane, under **Manage**, expand **Greengrass devices**, and then choose **Groups (V1)**.

1. <a name="group-choose-target-group"></a>Choose the target group.

1. Choose the **Lambda functions tab**.

1. Under **System Lambda functions**, select **Stream manager**, and choose **Edit**.

1. Check the enabled or disabled status. Any custom stream manager settings that are configured are also displayed.

 

### To enable or disable stream manager during group creation (console)
<a name="enable-stream-manager-console-new-group"></a>

1. <a name="console-gg-groups"></a>In the AWS IoT console navigation pane, under **Manage**, expand **Greengrass devices**, and then choose **Groups (V1)**.

1. Choose **Create Group**. Your choice on the next page determines how you configure stream manager for the group.

1. Proceed through the **Name your Group** and choose a **Greengrass core** pages.

1. Choose **Create group**.

1. On the group configuration page, choose the **Lambda functions** tab, select **Stream manager**, and choose **Edit**.
   + To enable stream manager with default settings, choose **Enable with default settings**.

      
   + To enable stream manager with custom settings, choose **Customize settings**.

     1. On the **Configure Stream manager** page, choose **Enable with custom settings**.

     1. Under **Custom settings**, enter values for stream manager parameters. For more information, see [Stream manager parameters](#stream-manager-parameters). Leave fields empty to allow AWS IoT Greengrass to use their default values.

         
   + To disable stream manager, choose **Disable**.

     1. On the **Configure stream manager** page, choose **Disable**.

         

1. Choose **Save**.

1. <a name="continue-create-group"></a>Continue through the remaining pages to create your group.

1. On the **Client devices** page, download your security resources, review the information, and then choose **Finish**.
**Note**  
When stream manager is enabled, you must [install the Java 8 runtime](stream-manager.md#stream-manager-requirements) on the core device before you deploy the group.

 

### To enable or disable stream manager for an existing group (console)
<a name="enable-stream-manager-console-existing-group"></a>

1. <a name="console-gg-groups"></a>In the AWS IoT console navigation pane, under **Manage**, expand **Greengrass devices**, and then choose **Groups (V1)**.

1. <a name="group-choose-target-group"></a>Choose the target group.

1. Choose the **Lambda functions tab**.

1. Under **System Lambda functions**, select **Stream manager**, and choose **Edit**.

1. Check the enabled or disabled status. Any custom stream manager settings that are configured are also displayed.

 

### To change stream manager settings (console)
<a name="change-stream-manager-console"></a>

1. <a name="console-gg-groups"></a>In the AWS IoT console navigation pane, under **Manage**, expand **Greengrass devices**, and then choose **Groups (V1)**.

1. <a name="group-choose-target-group"></a>Choose the target group.

1. Choose the **Lambda functions tab**.

1. Under **System Lambda functions**, select **Stream manager**, and choose **Edit**.

1. Check the enabled or disabled status. Any custom stream manager settings that are configured are also displayed.

1. Choose **Save**.

## Configure stream manager settings (CLI)
<a name="configure-stream-manager-cli"></a>

In the AWS CLI, use the system `GGStreamManager` Lambda function to configure stream manager. System Lambda functions are components of the AWS IoT Greengrass Core software. For stream manager and some other system Lambda functions, you can configure Greengrass functionality by managing the corresponding `Function` and `FunctionDefinitionVersion` objects in the Greengrass group. For more information, see [Overview of the AWS IoT Greengrass group object model](deployments.md#api-overview).

You can use the API for the following management tasks. The examples in this section show how to use the AWS CLI, but you can also call the AWS IoT Greengrass API directly or use an AWS SDK.
+ [Check if stream manager is enabled](#check-stream-manager-cli)
+ [Enable, disable, or configure stream manager](#enable-stream-manager-cli)

Changes take effect after the group is deployed. For a tutorial that shows how to deploy a Greengrass group with a Lambda function that interacts with stream manager, see [Export data streams to the AWS Cloud (CLI)](stream-manager-cli.md).

**Tip**  
To see if stream manager is enabled and running from your core device, you can run the following command in a terminal on the device.  

```
ps aux | grep -i 'streammanager'
```

 

### To check if stream manager is enabled (CLI)
<a name="check-stream-manager-cli"></a>

Stream manager is enabled if your deployed function definition version includes the system `GGStreamManager` Lambda function. To check, do the following;

1. <a name="get-group-id-latestversion"></a>Get the IDs of the target Greengrass group and group version. This procedure assumes that this is the latest group and group version. The following query returns the most recently created group.

   ```
   aws greengrass list-groups --query "reverse(sort_by(Groups, &CreationTimestamp))[0]"
   ```

   Or, you can query by name. Group names are not required to be unique, so multiple groups might be returned.

   ```
   aws greengrass list-groups --query "Groups[?Name=='MyGroup']"
   ```
**Note**  
<a name="find-group-ids-console"></a>You can also find these values in the AWS IoT console. The group ID is displayed on the group's **Settings** page. Group version IDs are displayed on the group's **Deployments** tab.

1. <a name="copy-group-id-latestversion"></a>Copy the `Id` and `LatestVersion` values from the target group in the output.

1. <a name="get-latest-group-version"></a>Get the latest group version.
   + Replace *group-id* with the `Id` that you copied.
   + Replace *latest-group-version-id* with the `LatestVersion` that you copied.

   ```
   aws greengrass get-group-version \
   --group-id group-id \
   --group-version-id latest-group-version-id
   ```

1. From the `FunctionDefinitionVersionArn` in the output, get the IDs of the function definition and function definition version.
   + The function definition ID is the GUID that follows the `functions` segment in the Amazon Resource Name (ARN).
   + The function definition version ID is the GUID that follows the `versions` segment in the ARN.

   ```
   arn:aws:greengrass:us-west-2:123456789012:/greengrass/definition/functions/function-definition-id/versions/function-definition-version-id
   ```

1. Get the function definition version.
   + Replace *function-definition-id* with the function definition ID.
   + Replace *function-definition-version-id* with the function definition version ID.

   ```
   aws greengrass get-function-definition-version \
   --function-definition-id function-definition-id \
   --function-definition-version-id function-definition-version-id
   ```

If the `functions` array in the output includes the `GGStreamManager` function, then stream manager is enabled. Any environment variables defined for the function represent custom settings for stream manager.

### To enable, disable, or configure stream manager (CLI)
<a name="enable-stream-manager-cli"></a>

In the AWS CLI, use the system `GGStreamManager` Lambda function to configure stream manager. Changes take effect after you deploy the group.
+ To enable stream manager, include `GGStreamManager` in the `functions` array of your function definition version. To configure custom settings, define environment variables for the corresponding [stream manager parameters](#stream-manager-parameters).
+ To disable stream manager, remove `GGStreamManager` from the `functions` array of your function definition version.

**Stream manager with default settings**  
The following example configuration enables stream manager with default settings. It sets the arbitrary function ID to `streamManager`.  

```
{
    "FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1",
    "FunctionConfiguration": {
        "MemorySize": 4194304,
        "Pinned": true,
        "Timeout": 3
    },
    "Id": "streamManager"
}
```
For the `FunctionConfiguration` properties, you might know the following:  
+ `MemorySize` is set to 4194304 KB (4 GB) with default settings. You can always change this value. We recommend that you set `MemorySize` to at least 128000 KB.
+ `Pinned` must be set to `true`.
+ `Timeout` is required by the function definition version, but `GGStreamManager` doesn't use it.

**Stream manager with custom settings**  <a name="enable-stream-manager-custom-settings"></a>
The following example configuration enables stream manager with custom values for the storage directory, server port, and thread pool size parameters.  

```
{
    "FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1",
    "FunctionConfiguration": {
        "Environment": {
            "Variables": {
                "STREAM_MANAGER_STORE_ROOT_DIR": "/data",
                "STREAM_MANAGER_SERVER_PORT": "1234",
                "STREAM_MANAGER_EXPORTER_THREAD_POOL_SIZE": "4"
            }
        },
        "MemorySize": 4194304,
        "Pinned": true,
        "Timeout": 3
    },
    "Id": "streamManager"
}
```
AWS IoT Greengrass uses default values for [stream manager parameters](#stream-manager-parameters) that aren't specified as environment variables.

**Stream manager with custom settings for Amazon S3 exports**  <a name="enable-stream-manager-custom-settings-s3"></a>
The following example configuration enables stream manager with custom values for the upload directory and minimum multipart upload size parameters.  

```
{
    "FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1",
    "FunctionConfiguration": {
        "Environment": {
            "Variables": {
                "STREAM_MANAGER_READ_ONLY_DIRS": "/mnt/directory-1,/mnt/directory-2,/tmp",
                "STREAM_MANAGER_EXPORTER_S3_DESTINATION_MULTIPART_UPLOAD_MIN_PART_SIZE_BYTES": "10485760"
            }
        },
        "MemorySize": 4194304,
        "Pinned": true,
        "Timeout": 3
    },
    "Id": "streamManager"
}
```

 

**To enable, disable, or configure stream manager (CLI)**

1. <a name="get-group-id-latestversion"></a>Get the IDs of the target Greengrass group and group version. This procedure assumes that this is the latest group and group version. The following query returns the most recently created group.

   ```
   aws greengrass list-groups --query "reverse(sort_by(Groups, &CreationTimestamp))[0]"
   ```

   Or, you can query by name. Group names are not required to be unique, so multiple groups might be returned.

   ```
   aws greengrass list-groups --query "Groups[?Name=='MyGroup']"
   ```
**Note**  
<a name="find-group-ids-console"></a>You can also find these values in the AWS IoT console. The group ID is displayed on the group's **Settings** page. Group version IDs are displayed on the group's **Deployments** tab.

1. <a name="copy-group-id-latestversion"></a>Copy the `Id` and `LatestVersion` values from the target group in the output.

1. <a name="get-latest-group-version"></a>Get the latest group version.
   + Replace *group-id* with the `Id` that you copied.
   + Replace *latest-group-version-id* with the `LatestVersion` that you copied.

   ```
   aws greengrass get-group-version \
   --group-id group-id \
   --group-version-id latest-group-version-id
   ```

1. Copy the `CoreDefinitionVersionArn` and all other version ARNs from the output, except `FunctionDefinitionVersionArn`. You use these values later when you create a group version.

1. <a name="parse-function-def-id"></a>From the `FunctionDefinitionVersionArn` in the output, copy the ID of the function definition. The ID is the GUID that follows the `functions` segment in the ARN, as shown in the following example.

   ```
   arn:aws:greengrass:us-west-2:123456789012:/greengrass/definition/functions/bcfc6b49-beb0-4396-b703-6dEXAMPLEcu5/versions/0f7337b4-922b-45c5-856f-1aEXAMPLEsf6
   ```
**Note**  
Or, you can create a function definition by running the [https://docs.aws.amazon.com/cli/latest/reference/greengrass/create-function-definition.html](https://docs.aws.amazon.com/cli/latest/reference/greengrass/create-function-definition.html) command, and then copying the ID from the output.

1. <a name="enable-stream-manager-function-definition-version"></a>Add a function definition version to the function definition.
   + Replace *function-definition-id* with the `Id` that you copied for the function definition.
   + In the `functions` array, include all other functions that you want to make available on the Greengrass core. You can use the `get-function-definition-version` command to get the list of existing functions.

      
**Enable stream manager with default settings**  
The following example enables stream manager, by including the `GGStreamManager` function in the `functions` array. This example uses default values for [stream manager parameters](#stream-manager-parameters).  

   ```
   aws greengrass create-function-definition-version \
   --function-definition-id function-definition-id \
   --functions '[
           {
               "FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1",
               "FunctionConfiguration": {
                   "MemorySize":  4194304,
                   "Pinned": true,
                   "Timeout": 3
               },
               "Id": "streamManager"
           },
           {    
               "FunctionArn": "arn:aws:lambda:us-west-2:123456789012:function:MyLambdaFunction:MyAlias",
               "FunctionConfiguration": {
                   "Executable": "myLambdaFunction.function_handler",
                   "MemorySize": 16000,
                   "Pinned": true,
                   "Timeout": 5
               },
               "Id": "myLambdaFunction"
           },
           ... more user-defined functions
       ]
   }'
   ```
The `myLambdaFunction` function in the examples represents one of your user-defined Lambda functions.  
**Enable stream manager with custom settings**  
The following example enables stream manager by including the `GGStreamManager` function in the `functions` array. All stream manager settings are optional, unless you want to change the default values. This example shows how to use environment variables to set custom values.  

   ```
   aws greengrass create-function-definition-version \
   --function-definition-id function-definition-id \
   --functions '[
           {
               "FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1",
               "FunctionConfiguration": {
                   "Environment": {
                       "Variables": {
                           "STREAM_MANAGER_STORE_ROOT_DIR": "/data",
                           "STREAM_MANAGER_SERVER_PORT": "1234",
                           "STREAM_MANAGER_EXPORTER_THREAD_POOL_SIZE": "4"
                       }
                   },
                   "MemorySize":  4194304,
                   "Pinned": true,
                   "Timeout": 3
               },
               "Id": "streamManager"
           },
           {    
               "FunctionArn": "arn:aws:lambda:us-west-2:123456789012:function:MyLambdaFunction:MyAlias",
               "FunctionConfiguration": {
                   "Executable": "myLambdaFunction.function_handler",
                   "MemorySize": 16000,
                   "Pinned": true,
                   "Timeout": 5
               },
               "Id": "myLambdaFunction"
           },
           ... more user-defined functions
       ]
   }'
   ```
For the `FunctionConfiguration` properties, you might know the following:  
   + `MemorySize` is set to 4194304 KB (4 GB) with default settings. You can always change this value. We recommend that you set `MemorySize` to at least 128000 KB.
   + `Pinned` must be set to `true`.
   + `Timeout` is required by the function definition version, but `GGStreamManager` doesn't use it.  
**Disable stream manager**  
The following example omits the `GGStreamManager` function, which disables stream manager.  

   ```
   aws greengrass create-function-definition-version \
   --function-definition-id function-definition-id \
   --functions '[
           {       
               "FunctionArn": "arn:aws:lambda:us-west-2:123456789012:function:MyLambdaFunction:MyAlias",
               "FunctionConfiguration": {
                   "Executable": "myLambdaFunction.function_handler",
                   "MemorySize": 16000,
                   "Pinned": true,
                   "Timeout": 5
               },
               "Id": "myLambdaFunction"
           },
           ... more user-defined functions
       ]
   }'
   ```
If you don't want to deploy any Lambda functions, you can omit the function definition version entirely.

1. <a name="copy-function-def-version-arn"></a>Copy the `Arn` of the function definition version from the output.

1. <a name="create-group-version-with-sys-lambda"></a>Create a group version that contains the system Lambda function.
   + Replace *group-id* with the `Id` for the group.
   + Replace *core-definition-version-arn* with the `CoreDefinitionVersionArn` that you copied from the latest group version.
   + Replace *function-definition-version-arn* with the `Arn` that you copied for the new function definition version.
   + Replace the ARNs for other group components (for example, `SubscriptionDefinitionVersionArn` or `DeviceDefinitionVersionArn`) that you copied from the latest group version.
   + Remove any unused parameters. For example, remove the `--resource-definition-version-arn` if your group version doesn't contain any resources.

   ```
   aws greengrass create-group-version \
   --group-id group-id \
   --core-definition-version-arn core-definition-version-arn \
   --function-definition-version-arn function-definition-version-arn \
   --device-definition-version-arn device-definition-version-arn \
   --logger-definition-version-arn logger-definition-version-arn \
   --resource-definition-version-arn resource-definition-version-arn \
   --subscription-definition-version-arn subscription-definition-version-arn
   ```

1. <a name="copy-group-version-id"></a>Copy the `Version` from the output. This is the ID of the new group version.

1. <a name="create-group-deployment"></a>Deploy the group with the new group version.
   + Replace *group-id* with the `Id` that you copied for the group.
   + Replace *group-version-id* with the `Version` that you copied for the new group version.

   ```
   aws greengrass create-deployment \
   --group-id group-id \
   --group-version-id group-version-id \
   --deployment-type NewDeployment
   ```

 

Follow this procedure if you want to edit stream manager settings again later. Make sure to create a function definition version that includes the `GGStreamManager` function with the updated configuration. The group version must reference all component version ARNs that you want to deploy to the core. Changes take effect after the group is deployed.

## See also
<a name="configure-stream-manager-see-also"></a>
+ [Manage data streams on the AWS IoT Greengrass core](stream-manager.md)
+ [Use StreamManagerClient to work with streams](work-with-streams.md)
+ [Export configurations for supported AWS Cloud destinations](stream-export-configurations.md)
+ [Export data streams to the AWS Cloud (console)](stream-manager-console.md)
+ [Export data streams to the AWS Cloud (CLI)](stream-manager-cli.md)

# Use StreamManagerClient to work with streams
<a name="work-with-streams"></a>

User-defined Lambda functions running on the AWS IoT Greengrass core can use the `StreamManagerClient` object in the [AWS IoT Greengrass Core SDK](lambda-functions.md#lambda-sdks) to create streams in [stream manager](stream-manager.md) and then interact with the streams. When a Lambda function creates a stream, it defines the AWS Cloud destinations, prioritization, and other export and data retention policies for the stream. To send data to stream manager, Lambda functions append the data to the stream. If an export destination is defined for the stream, stream manager exports the stream automatically.

**Note**  
<a name="stream-manager-clients"></a>Typically, clients of stream manager are user-defined Lambda functions. If your business case requires it, you can also allow non-Lambda processes running on the Greengrass core (for example, a Docker container) to interact with stream manager. For more information, see [Client authentication](stream-manager.md#stream-manager-security-client-authentication).

The snippets in this topic show you how clients call `StreamManagerClient` methods to work with streams. For implementation details about the methods and their arguments, use the links to the SDK reference listed after each snippet. For tutorials that include a complete Python Lambda function, see [Export data streams to the AWS Cloud (console)](stream-manager-console.md) or [Export data streams to the AWS Cloud (CLI)](stream-manager-cli.md).

Your Lambda function should instantiate `StreamManagerClient` outside of the function handler. If instantiated in the handler, the function creates a `client` and connection to stream manager every time that it's invoked.

**Note**  
If you do instantiate `StreamManagerClient` in the handler, you must explicitly call the `close()` method when the `client` completes its work. Otherwise, the `client` keeps the connection open and another thread running until the script exits.

`StreamManagerClient` supports the following operations:
+ [Create message stream](#streammanagerclient-create-message-stream)
+ [Append message](#streammanagerclient-append-message)
+ [Read messages](#streammanagerclient-read-messages)
+ [List streams](#streammanagerclient-list-streams)
+ [Describe message stream](#streammanagerclient-describe-message-stream)
+ [Update message stream](#streammanagerclient-update-message-stream)
+ [Delete message stream](#streammanagerclient-delete-message-stream)

## Create message stream
<a name="streammanagerclient-create-message-stream"></a>

To create a stream, a user-defined Lambda function calls the create method and passes in a `MessageStreamDefinition` object. This object specifies the unique name for the stream and defines how stream manager should handle new data when the maximum stream size is reached. You can use `MessageStreamDefinition` and its data types (such as `ExportDefinition`, `StrategyOnFull`, and `Persistence`) to define other stream properties. These include:
+ The target AWS IoT Analytics, Kinesis Data Streams, AWS IoT SiteWise, and Amazon S3 destinations for automatic exports. For more information, see [Export configurations for supported AWS Cloud destinations](stream-export-configurations.md).
+ Export priority. Stream manager exports higher priority streams before lower priority streams.
+ Maximum batch size and batch interval for AWS IoT Analytics, Kinesis Data Streams, and AWS IoT SiteWise destinations. Stream manager exports messages when either condition is met.
+ Time-to-live (TTL). The amount of time to guarantee that the stream data is available for processing. You should make sure that the data can be consumed within this time period. This is not a deletion policy. The data might not be deleted immediately after TTL period.
+ Stream persistence. Choose to save streams to the file system to persist data across core restarts or save streams in memory.
+ Starting sequence number. Specify the sequence number of the message to use as the starting message in the export.

For more information about `MessageStreamDefinition`, see the SDK reference for your target language:
+ [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html) in the Java SDK
+ [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html) in the Node.js SDK
+ [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.MessageStreamDefinition) in the Python SDK

**Note**  
<a name="streammanagerclient-http-config"></a>`StreamManagerClient` also provides a target destination you can use to export streams to an HTTP server. This target is intended for testing purposes only. It is not stable or supported for use in production environments.

After a stream is created, your Lambda functions can [append messages](#streammanagerclient-append-message) to the stream to send data for export and [read messages](#streammanagerclient-append-message) from the stream for local processing. The number of streams that you create depends on your hardware capabilities and business case. One strategy is to create a stream for each target channel in AWS IoT Analytics or Kinesis data stream, though you can define multiple targets for a stream. A stream has a durable lifespan.

### Requirements
<a name="streammanagerclient-create-message-stream-reqs"></a>

This operation has the following requirements:
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core version: 1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.5.0  \$1  Java: 1.4.0  \$1  Node.js: 1.6.0

**Note**  
Creating streams with an AWS IoT SiteWise or Amazon S3 export destination has the following requirements:  
<a name="streammanagerclient-min-ggc-1.11.0"></a>Minimum AWS IoT Greengrass Core version: 1.11.0
<a name="streammanagerclient-min-sdk-ggc-1.11.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.6.0  \$1  Java: 1.5.0  \$1  Node.js: 1.7.0

### Examples
<a name="streammanagerclient-create-message-stream-examples"></a>

The following snippet creates a stream named `StreamName`. It defines stream properties in the `MessageStreamDefinition` and subordinate data types.

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    client.create_message_stream(MessageStreamDefinition(
        name="StreamName",  # Required.
        max_size=268435456,  # Default is 256 MB.
        stream_segment_size=16777216,  # Default is 16 MB.
        time_to_live_millis=None,  # By default, no TTL is enabled.
        strategy_on_full=StrategyOnFull.OverwriteOldestData,  # Required.
        persistence=Persistence.File,  # Default is File.
        flush_on_write=False,  # Default is false.
        export_definition=ExportDefinition(  # Optional. Choose where/how the stream is exported to the AWS Cloud.
            kinesis=None,
            iot_analytics=None,
            iot_sitewise=None,
            s3_task_executor=None
        )
    ))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [create\$1message\$1stream](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.create_message_stream) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.MessageStreamDefinition)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    client.createMessageStream(
            new MessageStreamDefinition()
                    .withName("StreamName") // Required.
                    .withMaxSize(268435456L)  // Default is 256 MB.
                    .withStreamSegmentSize(16777216L)  // Default is 16 MB.
                    .withTimeToLiveMillis(null)  // By default, no TTL is enabled.
                    .withStrategyOnFull(StrategyOnFull.OverwriteOldestData)  // Required.
                    .withPersistence(Persistence.File)  // Default is File.
                    .withFlushOnWrite(false)  // Default is false.
                    .withExportDefinition(  // Optional. Choose where/how the stream is exported to the AWS Cloud.
                            new ExportDefinition()
                                    .withKinesis(null)
                                    .withIotAnalytics(null)
                                    .withIotSitewise(null)
                                    .withS3TaskExecutor(null)
                    )
 
    );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [createMessageStream](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#createMessageStream-com.amazonaws.greengrass.streammanager.model.MessageStreamDefinition-) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        await client.createMessageStream(
            new MessageStreamDefinition()
                .withName("StreamName") // Required.
                .withMaxSize(268435456)  // Default is 256 MB.
                .withStreamSegmentSize(16777216)  // Default is 16 MB.
                .withTimeToLiveMillis(null)  // By default, no TTL is enabled.
                .withStrategyOnFull(StrategyOnFull.OverwriteOldestData)  // Required.
                .withPersistence(Persistence.File)  // Default is File.
                .withFlushOnWrite(false)  // Default is false.
                .withExportDefinition(  // Optional. Choose where/how the stream is exported to the AWS Cloud.
                    new ExportDefinition()
                        .withKinesis(null)
                        .withIotAnalytics(null)
                        .withIotSitewise(null)
                        .withS3TaskExecutor(null)
                )
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [createMessageStream](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#createMessageStream) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html)

------

For more information about configuring export destinations, see [Export configurations for supported AWS Cloud destinations](stream-export-configurations.md).

 

## Append message
<a name="streammanagerclient-append-message"></a>

To send data to stream manager for export, your Lambda functions append the data to the target stream. The export destination determines the data type to pass to this method.

### Requirements
<a name="streammanagerclient-append-message-reqs"></a>

This operation has the following requirements:
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core version: 1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.5.0  \$1  Java: 1.4.0  \$1  Node.js: 1.6.0

**Note**  
Appending messages with an AWS IoT SiteWise or Amazon S3 export destination has the following requirements:  
<a name="streammanagerclient-min-ggc-1.11.0"></a>Minimum AWS IoT Greengrass Core version: 1.11.0
<a name="streammanagerclient-min-sdk-ggc-1.11.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.6.0  \$1  Java: 1.5.0  \$1  Node.js: 1.7.0

### Examples
<a name="streammanagerclient-append-message-examples"></a>

#### AWS IoT Analytics or Kinesis Data Streams export destinations
<a name="streammanagerclient-append-message-blob"></a>

The following snippet appends a message to the stream named `StreamName`. For AWS IoT Analytics or Kinesis Data Streams destinations, your Lambda functions append a blob of data.

This snippet has the following requirements:
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core version: 1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.5.0  \$1  Java: 1.4.0  \$1  Node.js: 1.6.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data')
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [append\$1message](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.append_message)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes());
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [appendMessage](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array"));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [appendMessage](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage)

------

#### AWS IoT SiteWise export destinations
<a name="streammanagerclient-append-message-sitewise"></a>

The following snippet appends a message to the stream named `StreamName`. For AWS IoT SiteWise destinations, your Lambda functions append a serialized `PutAssetPropertyValueEntry` object. For more information, see [Exporting to AWS IoT SiteWise](stream-export-configurations.md#export-streams-to-sitewise).

**Note**  
<a name="BatchPutAssetPropertyValue-data-reqs"></a>When you send data to AWS IoT SiteWise, your data must meet the requirements of the `BatchPutAssetPropertyValue` action. For more information, see [BatchPutAssetPropertyValue](https://docs.aws.amazon.com/iot-sitewise/latest/APIReference/API_BatchPutAssetPropertyValue.html) in the *AWS IoT SiteWise API Reference*.

This snippet has the following requirements:
+ <a name="streammanagerclient-min-ggc-1.11.0"></a>Minimum AWS IoT Greengrass Core version: 1.11.0
+ <a name="streammanagerclient-min-sdk-ggc-1.11.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.6.0  \$1  Java: 1.5.0  \$1  Node.js: 1.7.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    # SiteWise requires unique timestamps in all messages. Add some randomness to time and offset.

    # Note: To create a new asset property data, you should use the classes defined in the
    # greengrasssdk.stream_manager module.

    time_in_nanos = TimeInNanos(
        time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
    )
    variant = Variant(double_value=random.random())
    asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
    putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset)
    sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [append\$1message](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.append_message) \$1 [PutAssetPropertyValueEntry](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.PutAssetPropertyValueEntry)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    Random rand = new Random();
    // Note: To create a new asset property data, you should use the classes defined in the
    // com.amazonaws.greengrass.streammanager.model.sitewise package.
    List<AssetPropertyValue> entries = new ArrayList<>() ;

    // IoTSiteWise requires unique timestamps in all messages. Add some randomness to time and offset.
    final int maxTimeRandomness = 60;
    final int maxOffsetRandomness = 10000;
    double randomValue = rand.nextDouble();
    TimeInNanos timestamp = new TimeInNanos()
            .withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness))
            .withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness)));
    AssetPropertyValue entry = new AssetPropertyValue()
            .withValue(new Variant().withDoubleValue(randomValue))
            .withQuality(Quality.GOOD)
            .withTimestamp(timestamp);
    entries.add(entry);

    PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
            .withEntryId(UUID.randomUUID().toString())
            .withPropertyAlias("PropertyAlias")
            .withPropertyValues(entries);
    long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [appendMessage](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-) \$1 [PutAssetPropertyValueEntry](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/sitewise/PutAssetPropertyValueEntry.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const maxTimeRandomness = 60;
        const maxOffsetRandomness = 10000;
        const randomValue = Math.random();
        // Note: To create a new asset property data, you should use the classes defined in the
        // aws-greengrass-core-sdk StreamManager module.
        const timestamp = new TimeInNanos()
            .withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness))
            .withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness));
        const entry = new AssetPropertyValue()
            .withValue(new Variant().withDoubleValue(randomValue))
            .withQuality(Quality.GOOD)
            .withTimestamp(timestamp);

        const putAssetPropertyValueEntry =  new PutAssetPropertyValueEntry()
            .withEntryId(`${ENTRY_ID_PREFIX}${i}`)
            .withPropertyAlias("PropertyAlias")
            .withPropertyValues([entry]);
        const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [appendMessage](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage) \$1 [PutAssetPropertyValueEntry](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.PutAssetPropertyValueEntry.html)

------

#### Amazon S3 export destinations
<a name="streammanagerclient-append-message-export-task"></a>

The following snippet appends an export task to the stream named `StreamName`. For Amazon S3 destinations, your Lambda functions append a serialized `S3ExportTaskDefinition` object that contains information about the source input file and target Amazon S3 object. If the specified object doesn't exist, Stream Manager creates it for you. For more information, see [Exporting to Amazon S3](stream-export-configurations.md#export-streams-to-s3).

This snippet has the following requirements:
+ <a name="streammanagerclient-min-ggc-1.11.0"></a>Minimum AWS IoT Greengrass Core version: 1.11.0
+ <a name="streammanagerclient-min-sdk-ggc-1.11.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.6.0  \$1  Java: 1.5.0  \$1  Node.js: 1.7.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    # Append an Amazon S3 Task definition and print the sequence number.
    s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName")
    sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [append\$1message](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.append_message) \$1 [S3ExportTaskDefinition](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.S3ExportTaskDefinition)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    // Append an Amazon S3 export task definition and print the sequence number.
    S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition()
        .withBucket("BucketName")
        .withKey("KeyName")
        .withInputUrl("URLToFile");
    long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition));
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [appendMessage](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-) \$1 [S3ExportTaskDefinition](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/S3ExportTaskDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
     // Append an Amazon S3 export task definition and print the sequence number.
     const taskDefinition = new S3ExportTaskDefinition()
        .withBucket("BucketName")
        .withKey("KeyName")
        .withInputUrl("URLToFile");
        const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition)));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [appendMessage](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage) \$1 [S3ExportTaskDefinition](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.S3ExportTaskDefinition.html)

------

 

## Read messages
<a name="streammanagerclient-read-messages"></a>

Read messages from a stream.

### Requirements
<a name="streammanagerclient-read-messages-reqs"></a>

This operation has the following requirements:
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core version: 1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.5.0  \$1  Java: 1.4.0  \$1  Node.js: 1.6.0

### Examples
<a name="streammanagerclient-read-messages-examples"></a>

The following snippet reads messages from the stream named `StreamName`. The read method takes an optional `ReadMessagesOptions` object that specifies the sequence number to start reading from, the minimum and maximum numbers to read, and a timeout for reading messages.

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    message_list = client.read_messages(
        stream_name="StreamName",
        # By default, if no options are specified, it tries to read one message from the beginning of the stream.
        options=ReadMessagesOptions(
            desired_start_sequence_number=100,
            # Try to read from sequence number 100 or greater. By default, this is 0.
            min_message_count=10,
            # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
            max_message_count=100,  # Accept up to 100 messages. By default this is 1.
            read_timeout_millis=5000
            # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
        )
    )
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [read\$1messages](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.read_messages) \$1 [ReadMessagesOptions](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.ReadMessagesOptions)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    List<Message> messages = client.readMessages("StreamName",
            // By default, if no options are specified, it tries to read one message from the beginning of the stream.
            new ReadMessagesOptions()
                    // Try to read from sequence number 100 or greater. By default this is 0.
                    .withDesiredStartSequenceNumber(100L)
                    // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
                    .withMinMessageCount(10L)
                    // Accept up to 100 messages. By default this is 1.
                    .withMaxMessageCount(100L)
                    // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
                    .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis())
    );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [readMessages](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#readMessages-java.lang.String-com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions-) \$1 [ReadMessagesOptions](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/ReadMessagesOptions.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const messages = await client.readMessages("StreamName",
            // By default, if no options are specified, it tries to read one message from the beginning of the stream.
            new ReadMessagesOptions()
                // Try to read from sequence number 100 or greater. By default this is 0.
                .withDesiredStartSequenceNumber(100)
                // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1.
                .withMinMessageCount(10)
                // Accept up to 100 messages. By default this is 1.
                .withMaxMessageCount(100)
                // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
                .withReadTimeoutMillis(5 * 1000)
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [readMessages](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#readMessages) \$1 [ReadMessagesOptions](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.ReadMessagesOptions.html)

------

 

## List streams
<a name="streammanagerclient-list-streams"></a>

Get the list of streams in stream manager.

### Requirements
<a name="streammanagerclient-list-streams-reqs"></a>

This operation has the following requirements:
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core version: 1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.5.0  \$1  Java: 1.4.0  \$1  Node.js: 1.6.0

### Examples
<a name="streammanagerclient-list-streams-examples"></a>

The following snippet gets a list of the streams (by name) in stream manager.

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    stream_names = client.list_streams()
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [list\$1streams](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.list_streams)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    List<String> streamNames = client.listStreams();
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [listStreams](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#listStreams--)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const streams = await client.listStreams();
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [listStreams](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#listStreams)

------

 

## Describe message stream
<a name="streammanagerclient-describe-message-stream"></a>

Get metadata about a stream, including the stream definition, size, and export status.

### Requirements
<a name="streammanagerclient-describe-message-stream-reqs"></a>

This operation has the following requirements:
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core version: 1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.5.0  \$1  Java: 1.4.0  \$1  Node.js: 1.6.0

### Examples
<a name="streammanagerclient-describe-message-stream-examples"></a>

The following snippet gets metadata about the stream named `StreamName`, including the stream's definition, size, and exporter statuses.

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    stream_description = client.describe_message_stream(stream_name="StreamName")
    if stream_description.export_statuses[0].error_message:
        # The last export of export destination 0 failed with some error
        # Here is the last sequence number that was successfully exported
        stream_description.export_statuses[0].last_exported_sequence_number
 
    if (stream_description.storage_status.newest_sequence_number >
            stream_description.export_statuses[0].last_exported_sequence_number):
        pass
        # The end of the stream is ahead of the last exported sequence number
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [describe\$1message\$1stream](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.describe_message_stream)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    MessageStreamInfo description = client.describeMessageStream("StreamName");
    String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage();
    if (lastErrorMessage != null && !lastErrorMessage.equals("")) {
        // The last export of export destination 0 failed with some error.
        // Here is the last sequence number that was successfully exported.
        description.getExportStatuses().get(0).getLastExportedSequenceNumber();
    }
 
    if (description.getStorageStatus().getNewestSequenceNumber() >
            description.getExportStatuses().get(0).getLastExportedSequenceNumber()) {
        // The end of the stream is ahead of the last exported sequence number.
    }
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [describeMessageStream](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#describeMessageStream-java.lang.String-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const description = await client.describeMessageStream("StreamName");
        const lastErrorMessage = description.exportStatuses[0].errorMessage;
        if (lastErrorMessage) {
            // The last export of export destination 0 failed with some error.
            // Here is the last sequence number that was successfully exported.
            description.exportStatuses[0].lastExportedSequenceNumber;
        }
 
        if (description.storageStatus.newestSequenceNumber >
            description.exportStatuses[0].lastExportedSequenceNumber) {
            // The end of the stream is ahead of the last exported sequence number.
        }
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [describeMessageStream](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#describeMessageStream)

------

 

## Update message stream
<a name="streammanagerclient-update-message-stream"></a>

Update properties of an existing stream. You might want to update a stream if your requirements change after the stream was created. For example:
+ Add a new [export configuration](stream-export-configurations.md) for an AWS Cloud destination.
+ Increase the maximum size of a stream to change how data is exported or retained. For example, the stream size in combination with your strategy on full settings might result in data being deleted or rejected before stream manager can process it.
+ Pause and resume exports; for example, if export tasks are long running and you want to ration your upload data.

Your Lambda functions follow this high-level process to update a stream:

1. [Get the description of the stream.](#streammanagerclient-describe-message-stream)

1. Update the target properties on the corresponding `MessageStreamDefinition` and subordinate objects.

1. Pass in the updated `MessageStreamDefinition`. Make sure to include the complete object definitions for the updated stream. Undefined properties revert to the default values.

   You can specify the sequence number of the message to use as the starting message in the export.

### Requirements
<a name="-streammanagerclient-update-message-streamreqs"></a>

This operation has the following requirements:
+ <a name="streammanagerclient-min-ggc-1.11.0"></a>Minimum AWS IoT Greengrass Core version: 1.11.0
+ <a name="streammanagerclient-min-sdk-ggc-1.11.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.6.0  \$1  Java: 1.5.0  \$1  Node.js: 1.7.0

### Examples
<a name="streammanagerclient-update-message-stream-examples"></a>

The following snippet updates the stream named `StreamName`. It updates multiple properties of a stream that exports to Kinesis Data Streams.

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    message_stream_info = client.describe_message_stream(STREAM_NAME)
    message_stream_info.definition.max_size=536870912
    message_stream_info.definition.stream_segment_size=33554432
    message_stream_info.definition.time_to_live_millis=3600000
    message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData
    message_stream_info.definition.persistence=Persistence.Memory
    message_stream_info.definition.flush_on_write=False
    message_stream_info.definition.export_definition.kinesis=
        [KinesisConfig(  
            # Updating Export definition to add a Kinesis Stream configuration.
            identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))]
    client.update_message_stream(message_stream_info.definition)
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [updateMessageStream](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.update_message_stream) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.MessageStreamDefinition)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME);
    // Update the message stream with new values.
    client.updateMessageStream(
        messageStreamInfo.getDefinition()
            .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
            // Max Size update should be greater than initial Max Size defined in Create Message Stream request
            .withMaxSize(536870912L) // Update Max Size to 512 MB.
            .withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB.
            .withFlushOnWrite(true) // Update flush on write to true.
            .withPersistence(Persistence.Memory) // Update the persistence to Memory.
            .withTimeToLiveMillis(3600000L)  // Update TTL to 1 hour.
            .withExportDefinition(
                // Optional. Choose where/how the stream is exported to the AWS Cloud.
                messageStreamInfo.getDefinition().getExportDefinition().
                    // Updating Export definition to add a Kinesis Stream configuration.
                    .withKinesis(new ArrayList<KinesisConfig>() {{
                        add(new KinesisConfig()
                            .withIdentifier(EXPORT_IDENTIFIER)
                            .withKinesisStreamName("test"));
                        }})
            );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [update\$1message\$1stream](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#updateMessageStream-java.lang.String-) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const messageStreamInfo = await c.describeMessageStream(STREAM_NAME);
        await client.updateMessageStream(
            messageStreamInfo.definition
                // Max Size update should be greater than initial Max Size defined in Create Message Stream request
                .withMaxSize(536870912)  // Default is 256 MB. Updating Max Size to 512 MB.
                .withStreamSegmentSize(33554432)  // Default is 16 MB. Updating Segment Size to 32 MB.
                .withTimeToLiveMillis(3600000)  // By default, no TTL is enabled. Update TTL to 1 hour.
                .withStrategyOnFull(StrategyOnFull.RejectNewData)  // Required. Updating Strategy on full to reject new data.
                .withPersistence(Persistence.Memory)  // Default is File. Update the persistence to Memory
                .withFlushOnWrite(true)  // Default is false. Updating to true.
                .withExportDefinition(  
                    // Optional. Choose where/how the stream is exported to the AWS Cloud.
                    messageStreamInfo.definition.exportDefinition
                        // Updating Export definition to add a Kinesis Stream configuration.
                        .withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())])
                )
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [updateMessageStream](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#updateMessageStream) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html)

------

### Constraints for updating streams
<a name="streammanagerclient-update-constraints"></a>

The following constraints apply when updating streams. Unless noted in the following list, updates take effect immediately.
+ You can't update a stream's persistence. To change this behavior, [delete the stream](#streammanagerclient-delete-message-stream) and [create a stream](#streammanagerclient-create-message-stream) that defines the new persistence policy.
+ You can update the maximum size of a stream only under the following conditions:
  + The maximum size must be greater or equal to the current size of the stream. <a name="messagestreaminfo-describe-stream"></a>To find this information, [describe the stream](#streammanagerclient-describe-message-stream) and then check the storage status of the returned `MessageStreamInfo` object.
  + The maximum size must be greater than or equal to the stream's segment size.
+ You can update the stream segment size to a value less than the maximum size of the stream. The updated setting applies to new segments.
+ Updates to the time to live (TTL) property apply to new append operations. If you decrease this value, stream manager might also delete existing segments that exceed the TTL.
+ Updates to the strategy on full property apply to new append operations. If you set the strategy to overwrite the oldest data, stream manager might also overwrite existing segments based on the new setting.
+ Updates to the flush on write property apply to new messages.
+ Updates to export configurations apply to new exports. The update request must include all export configurations that you want to support. Otherwise, stream manager deletes them.
  + When you update an export configuration, specify the identifier of the target export configuration.
  + To add an export configuration, specify a unique identifier for the new export configuration.
  + To delete an export configuration, omit the export configuration.
+ To [update](#streammanagerclient-update-message-stream) the starting sequence number of an export configuration in a stream, you must specify a value that's less than the latest sequence number. <a name="messagestreaminfo-describe-stream"></a>To find this information, [describe the stream](#streammanagerclient-describe-message-stream) and then check the storage status of the returned `MessageStreamInfo` object.

 

## Delete message stream
<a name="streammanagerclient-delete-message-stream"></a>

Deletes a stream. When you delete a stream, all of the stored data for the stream is deleted from the disk.

### Requirements
<a name="streammanagerclient-delete-message-stream-reqs"></a>

This operation has the following requirements:
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core version: 1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.5.0  \$1  Java: 1.4.0  \$1  Node.js: 1.6.0

### Examples
<a name="streammanagerclient-delete-message-stream-examples"></a>

The following snippet deletes the stream named `StreamName`.

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    client.delete_message_stream(stream_name="StreamName")
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [deleteMessageStream](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.delete_message_stream)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    client.deleteMessageStream("StreamName");
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [delete\$1message\$1stream](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#deleteMessageStream-java.lang.String-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        await client.deleteMessageStream("StreamName");
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [deleteMessageStream](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#deleteMessageStream)

------

## See also
<a name="work-with-streams-see-also"></a>
+ [Manage data streams on the AWS IoT Greengrass core](stream-manager.md)
+ [Configure AWS IoT Greengrass stream manager](configure-stream-manager.md)
+ [Export configurations for supported AWS Cloud destinations](stream-export-configurations.md)
+ [Export data streams to the AWS Cloud (console)](stream-manager-console.md)
+ [Export data streams to the AWS Cloud (CLI)](stream-manager-cli.md)
+ `StreamManagerClient` in the AWS IoT Greengrass Core SDK reference:
  + [Python](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html)
  + [Java](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html)
  + [Node.js](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html)

# Export configurations for supported AWS Cloud destinations
<a name="stream-export-configurations"></a>

User-defined Lambda functions use `StreamManagerClient` in the AWS IoT Greengrass Core SDK to interact with stream manager. When a Lambda function [creates a stream](work-with-streams.md#streammanagerclient-create-message-stream) or [updates a stream](work-with-streams.md#streammanagerclient-create-message-stream), it passes a `MessageStreamDefinition` object that represents stream properties, including the export definition. The `ExportDefinition` object contains the export configurations defined for the stream. Stream manager uses these export configurations to determine where and how to export the stream.

![\[Object model diagram of the ExportDefinition property type.\]](http://docs.aws.amazon.com/greengrass/v1/developerguide/images/stream-manager-exportconfigs.png)


You can define zero or more export configurations on a stream, including multiple export configurations for a single destination type. For example, you can export a stream to two AWS IoT Analytics channels and one Kinesis data stream.

For failed export attempts, stream manager continually retries exporting data to the AWS Cloud at intervals of up to five minutes. The number of retry attempts doesn't have a maximum limit.

**Note**  
<a name="streammanagerclient-http-config"></a>`StreamManagerClient` also provides a target destination you can use to export streams to an HTTP server. This target is intended for testing purposes only. It is not stable or supported for use in production environments.

**Topics**
+ [AWS IoT Analytics channels](#export-to-iot-analytics)
+ [Amazon Kinesis data streams](#export-to-kinesis)
+ [AWS IoT SiteWise asset properties](#export-to-iot-sitewise)
+ [Amazon S3 objects](#export-to-s3)

You are reponsible for maintaining these AWS Cloud resources.

## AWS IoT Analytics channels
<a name="export-to-iot-analytics"></a>

Stream manager supports automatic exports to AWS IoT Analytics. <a name="ita-export-destination"></a>AWS IoT Analytics lets you perform advanced analysis on your data to help make business decisions and improve machine learning models. For more information, see [What is AWS IoT Analytics?](https://docs.aws.amazon.com/iotanalytics/latest/userguide/welcome.html) in the *AWS IoT Analytics User Guide*.

In the AWS IoT Greengrass Core SDK, your Lambda functions use the `IoTAnalyticsConfig` to define the export configuration for this destination type. For more information, see the SDK reference for your target language:
+ [IoTAnalyticsConfig](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.IoTAnalyticsConfig) in the Python SDK
+ [IoTAnalyticsConfig](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/export/IoTAnalyticsConfig.html) in the Java SDK
+ [IoTAnalyticsConfig](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.IoTAnalyticsConfig.html) in the Node.js SDK

### Requirements
<a name="export-to-iot-analytics-reqs"></a>

This export destination has the following requirements:
+ Target channels in AWS IoT Analytics must be in the same AWS account and AWS Region as the Greengrass group.
+ The [Greengrass group role](group-role.md) must allow the `iotanalytics:BatchPutMessage` permission to target channels. For example:

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "iotanalytics:BatchPutMessage"
              ],
              "Resource": [
              "arn:aws:iotanalytics:us-east-1:123456789012:channel/channel_1_name",
      "arn:aws:iotanalytics:us-east-1:123456789012:channel/channel_2_name"
              ]
          }
      ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>You can grant granular or conditional access to resources, for example, by using a wildcard `*` naming scheme. For more information, see [Adding and removing IAM policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html) in the *IAM User Guide*.

### Exporting to AWS IoT Analytics
<a name="export-streams-to-iot-analytics"></a>

To create a stream that exports to AWS IoT Analytics, your Lambda functions [create a stream](work-with-streams.md#streammanagerclient-create-message-stream) with an export definition that includes one or more `IoTAnalyticsConfig` objects. This object defines export settings, such as the target channel, batch size, batch interval, and priority.

When your Lambda functions receive data from devices, they [append messages](work-with-streams.md#streammanagerclient-append-message) that contain a blob of data to the target stream.

Then, stream manager exports the data based on the batch settings and priority defined in the stream's export configurations.

 

## Amazon Kinesis data streams
<a name="export-to-kinesis"></a>

Stream manager supports automatic exports to Amazon Kinesis Data Streams. <a name="aks-export-destination"></a>Kinesis Data Streams is commonly used to aggregate high-volume data and load it into a data warehouse or map-reduce cluster. For more information, see [What is Amazon Kinesis Data Streams?](https://docs.aws.amazon.com/streams/latest/dev/what-is-this-service.html) in the *Amazon Kinesis Developer Guide*.

In the AWS IoT Greengrass Core SDK, your Lambda functions use the `KinesisConfig` to define the export configuration for this destination type. For more information, see the SDK reference for your target language:
+ [KinesisConfig](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.KinesisConfig) in the Python SDK
+ [KinesisConfig](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/export/KinesisConfig.html) in the Java SDK
+ [KinesisConfig](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.KinesisConfig.html) in the Node.js SDK

### Requirements
<a name="export-to-kinesis-reqs"></a>

This export destination has the following requirements:
+ Target streams in Kinesis Data Streams must be in the same AWS account and AWS Region as the Greengrass group.
+ The [Greengrass group role](group-role.md) must allow the `kinesis:PutRecords` permission to target data streams. For example:

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "kinesis:PutRecords"
              ],
              "Resource": [
              "arn:aws:kinesis:us-east-1:123456789012:stream/stream_1_name",
      "arn:aws:kinesis:us-east-1:123456789012:stream/stream_2_name"
              ]
          }
      ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>You can grant granular or conditional access to resources, for example, by using a wildcard `*` naming scheme. For more information, see [Adding and removing IAM policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html) in the *IAM User Guide*.

### Exporting to Kinesis Data Streams
<a name="export-streams-to-kinesis"></a>

To create a stream that exports to Kinesis Data Streams, your Lambda functions [create a stream](work-with-streams.md#streammanagerclient-create-message-stream) with an export definition that includes one or more `KinesisConfig` objects. This object defines export settings, such as the target data stream, batch size, batch interval, and priority.

When your Lambda functions receive data from devices, they [append messages](work-with-streams.md#streammanagerclient-append-message) that contain a blob of data to the target stream. Then, stream manager exports the data based on the batch settings and priority defined in the stream's export configurations.

Stream manager generates a unique, random UUID as a partition key for each record uploaded to Amazon Kinesis. 

 

## AWS IoT SiteWise asset properties
<a name="export-to-iot-sitewise"></a>

Stream manager supports automatic exports to AWS IoT SiteWise. <a name="itsw-export-destination"></a>AWS IoT SiteWise lets you collect, organize, and analyze data from industrial equipment at scale. For more information, see [What is AWS IoT SiteWise?](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/what-is-sitewise.html) in the *AWS IoT SiteWise User Guide*.

In the AWS IoT Greengrass Core SDK, your Lambda functions use the `IoTSiteWiseConfig` to define the export configuration for this destination type. For more information, see the SDK reference for your target language:
+ [IoTSiteWiseConfig](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.IoTSiteWiseConfig) in the Python SDK
+ [IoTSiteWiseConfig](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/export/IoTSiteWiseConfig.html) in the Java SDK
+ [IoTSiteWiseConfig](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.IoTSiteWiseConfig.html) in the Node.js SDK

**Note**  
AWS also provides the [IoT SiteWise connector](iot-sitewise-connector.md), which is a pre-built solution that you can use with OPC-UA sources.

### Requirements
<a name="export-to-iot-sitewise-reqs"></a>

This export destination has the following requirements:
+ Target asset properties in AWS IoT SiteWise must be in the same AWS account and AWS Region as the Greengrass group.
**Note**  
For the list of Regions that AWS IoT SiteWise supports, see [AWS IoT SiteWise endpoints and quotas](https://docs.aws.amazon.com/general/latest/gr/iot-sitewise.html#iot-sitewise_region) in the *AWS General Reference*.
+ The [Greengrass group role](group-role.md) must allow the `iotsitewise:BatchPutAssetPropertyValue` permission to target asset properties. The following example policy uses the `iotsitewise:assetHierarchyPath` condition key to grant access to a target root asset and its children. You can remove the `Condition` from the policy to allow access to all of your AWS IoT SiteWise assets or specify ARNs of individual assets.

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
               "Effect": "Allow",
               "Action": "iotsitewise:BatchPutAssetPropertyValue",
               "Resource": "*",
               "Condition": {
                   "StringLike": {
                       "iotsitewise:assetHierarchyPath": [
                           "/root node asset ID",
                           "/root node asset ID/*"
                       ]
                   }
               }
          }
      ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>You can grant granular or conditional access to resources, for example, by using a wildcard `*` naming scheme. For more information, see [Adding and removing IAM policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html) in the *IAM User Guide*.

  For important security information, see [ BatchPutAssetPropertyValue authorization](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/security_iam_service-with-iam.html#security_iam_service-with-iam-id-based-policies-batchputassetpropertyvalue-action) in the *AWS IoT SiteWise User Guide*.

### Exporting to AWS IoT SiteWise
<a name="export-streams-to-sitewise"></a>

To create a stream that exports to AWS IoT SiteWise, your Lambda functions [create a stream](work-with-streams.md#streammanagerclient-create-message-stream) with an export definition that includes one or more `IoTSiteWiseConfig` objects. This object defines export settings, such as the batch size, batch interval, and priority.

When your Lambda functions receive asset property data from devices, they append messages that contain the data to the target stream. Messages are JSON-serialized `PutAssetPropertyValueEntry` objects that contain property values for one or more asset properties. For more information, see [Append message](work-with-streams.md#streammanagerclient-append-message-sitewise) for AWS IoT SiteWise export destinations.

**Note**  
<a name="BatchPutAssetPropertyValue-data-reqs"></a>When you send data to AWS IoT SiteWise, your data must meet the requirements of the `BatchPutAssetPropertyValue` action. For more information, see [BatchPutAssetPropertyValue](https://docs.aws.amazon.com/iot-sitewise/latest/APIReference/API_BatchPutAssetPropertyValue.html) in the *AWS IoT SiteWise API Reference*.

Then, stream manager exports the data based on the batch settings and priority defined in the stream's export configurations.

 

You can adjust your stream manager settings and Lambda function logic to design your export strategy. For example:
+ For near real time exports, set low batch size and interval settings and append the data to the stream when it's received.
+ To optimize batching, mitigate bandwidth constraints, or minimize cost, your Lambda functions can pool the timestamp-quality-value (TQV) data points received for a single asset property before appending the data to the stream. One strategy is to batch entries for up to 10 different property-asset combinations, or property aliases, in one message instead of sending more than one entry for the same property. This helps stream manager to remain within [AWS IoT SiteWise quotas](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/quotas.html).

 

## Amazon S3 objects
<a name="export-to-s3"></a>

Stream manager supports automatic exports to Amazon S3. <a name="s3-export-destination"></a>You can use Amazon S3 to store and retrieve large amounts of data. For more information, see [What is Amazon S3?](https://docs.aws.amazon.com/AmazonS3/latest/dev/Welcome.html) in the *Amazon Simple Storage Service Developer Guide*.

In the AWS IoT Greengrass Core SDK, your Lambda functions use the `S3ExportTaskExecutorConfig` to define the export configuration for this destination type. For more information, see the SDK reference for your target language:
+ [S3ExportTaskExecutorConfig](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.S3ExportTaskExecutorConfig) in the Python SDK
+ [S3ExportTaskExecutorConfig](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/export/S3ExportTaskExecutorConfig.html) in the Java SDK
+ [S3ExportTaskExecutorConfig](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.S3ExportTaskExecutorConfig.html) in the Node.js SDK

### Requirements
<a name="export-to-s3-reqs"></a>

This export destination has the following requirements:
+ Target Amazon S3 buckets must be in the same AWS account as the Greengrass group.
+ If the [default containerization](lambda-group-config.md#lambda-containerization-groupsettings) for the Greengrass group is **Greengrass container**, you must set the [STREAM\$1MANAGER\$1READ\$1ONLY\$1DIRS](configure-stream-manager.md#stream-manager-read-only-directories) parameter to use an input file directory that's under `/tmp` or isn't on the root file system.
+ If a Lambda function running in **Greengrass container** mode writes input files to the input file directory, you must create a local volume resource for the directory and mount the directory to the container with write permissions. This ensures that the files are written to the root file system and visible outside the container. For more information, see [Access local resources with Lambda functions and connectors](access-local-resources.md).
+ The [Greengrass group role](group-role.md) must allow the following permissions to the target buckets. For example:

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "s3:PutObject",
                  "s3:AbortMultipartUpload",
                  "s3:ListMultipartUploadParts"
              ],
              "Resource": [
                  "arn:aws:s3:::bucket-1-name/*",
                  "arn:aws:s3:::bucket-2-name/*"
              ]
          }
      ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>You can grant granular or conditional access to resources, for example, by using a wildcard `*` naming scheme. For more information, see [Adding and removing IAM policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html) in the *IAM User Guide*.

### Exporting to Amazon S3
<a name="export-streams-to-s3"></a>

To create a stream that exports to Amazon S3, your Lambda functions use the `S3ExportTaskExecutorConfig` object to configure the export policy. The policy defines export settings, such as the multipart upload threshold and priority. For Amazon S3 exports, stream manager uploads data that it reads from local files on the core device. To initiate an upload, your Lambda functions append an export task to the target stream. The export task contains information about the input file and target Amazon S3 object. Stream manager executes tasks in the sequence that they are appended to the stream.

**Note**  
<a name="bucket-not-key-must-exist"></a>The target bucket must already exist in your AWS account. If an object for the specified key doesn't exist, stream manager creates the object for you.

 This high-level workflow is shown in the following diagram.

![\[Diagram of the stream manager workflow for Amazon S3 exports.\]](http://docs.aws.amazon.com/greengrass/v1/developerguide/images/stream-manager-s3.png)


Stream manager uses the multipart upload threshold property, [minimum part size](configure-stream-manager.md#stream-manager-minimum-part-size) setting, and size of the input file to determine how to upload data. The multipart upload threshold must be greater or equal to the minimum part size. If you want to upload data in parallel, you can create multiple streams.

The keys that specify your target Amazon S3 objects can include valid [Java DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html) strings in `!{timestamp:value}` placeholders. You can use these timestamp placeholders to partition data in Amazon S3 based on the time that the input file data was uploaded. For example, the following key name resolves to a value such as `my-key/2020/12/31/data.txt`.

```
my-key/!{timestamp:YYYY}/!{timestamp:MM}/!{timestamp:dd}/data.txt
```

**Note**  
If you want to monitor the export status for a stream, first create a status stream and then configure the export stream to use it. For more information, see [Monitor export tasks](#monitor-export-status-s3).

#### Manage input data
<a name="manage-s3-input-data"></a>

You can author code that IoT applications use to manage the lifecycle of the input data. The following example workflow shows how you might use Lambda functions to manage this data.

1. A local process receives data from devices or peripherals, and then writes the data to files in a directory on the core device. These are the input files for stream manager.
**Note**  
To determine if you must configure access to the input file directory, see the [STREAM\$1MANAGER\$1READ\$1ONLY\$1DIRS](configure-stream-manager.md#stream-manager-read-only-directories) parameter.  
The process that stream manager runs in inherits all of the file system permissions of the [default access identity](lambda-group-config.md#lambda-access-identity-groupsettings) for the group. Stream manager must have permission to access the input files. You can use the `chmod(1)` command to change the permission of the files, if necessary.

1. A Lambda function scans the directory and [appends an export task](work-with-streams.md#streammanagerclient-append-message-export-task) to the target stream when a new file is created. The task is a JSON-serialized `S3ExportTaskDefinition` object that specifies the URL of the input file, the target Amazon S3 bucket and key, and optional user metadata.

1. Stream manager reads the input file and exports the data to Amazon S3 in the order of appended tasks. <a name="bucket-not-key-must-exist"></a>The target bucket must already exist in your AWS account. If an object for the specified key doesn't exist, stream manager creates the object for you.

1. The Lambda function [reads messages](work-with-streams.md#streammanagerclient-read-messages) from a status stream to monitor the export status. After export tasks are completed, the Lambda function can delete the corresponding input files. For more information, see [Monitor export tasks](#monitor-export-status-s3).

### Monitor export tasks
<a name="monitor-export-status-s3"></a>

You can author code that IoT applications use to monitor the status of your Amazon S3 exports. Your Lambda functions must create a status stream and then configure the export stream to write status updates to the status stream. A single status stream can receive status updates from multiple streams that export to Amazon S3.

First, [create a stream](work-with-streams.md#streammanagerclient-create-message-stream) to use as the status stream. You can configure the size and retention policies for the stream to control the lifespan of the status messages. For example:
+ Set `Persistence` to `Memory` if you don't want to store the status messages.
+ Set `StrategyOnFull` to `OverwriteOldestData` so that new status messages are not lost.

Then, create or update the export stream to use the status stream. Specifically, set the status configuration property of the stream’s `S3ExportTaskExecutorConfig` export configuration. This tells stream manager to write status messages about the export tasks to the status stream. In the `StatusConfig` object, specify the name of the status stream and the level of verbosity. The following supported values range from least verbose (`ERROR`) to most verbose (`TRACE`). The default is `INFO`.
+ `ERROR`
+ `WARN`
+ `INFO`
+ `DEBUG`
+ `TRACE`

 

The following example workflow shows how Lambda functions might use a status stream to monitor export status.

1. As described in the previous workflow, a Lambda function [appends an export task](work-with-streams.md#streammanagerclient-append-message-export-task) to a stream that's configured to write status messages about export tasks to a status stream. The append operation return a sequence number that represents the task ID.

1. A Lambda function [reads messages](work-with-streams.md#streammanagerclient-read-messages) sequentially from the status stream, and then filters the messages based on the stream name and task ID or based on an export task property from the message context. For example, the Lambda function can filter by the input file URL of the export task, which is represented by the `S3ExportTaskDefinition` object in the message context.

   The following status codes indicate that an export task has reached a completed state:
   + `Success`. The upload was completed successfully.
   + `Failure`. Stream manager encountered an error, for example, the specified bucket does not exist. After resolving the issue, you can append the export task to the stream again.
   + `Canceled`. The task was aborted because the stream or export definition was deleted, or the time-to-live (TTL) period of the task expired.
**Note**  
The task might also have a status of `InProgress` or `Warning`. Stream manager issues warnings when an event returns an error that doesn't affect the execution of the task. For example, a failure to clean up an aborted partial upload returns a warning.

1. After export tasks are completed, the Lambda function can delete the corresponding input files.

The following example shows how a Lambda function might read and process status messages.

------
#### [ Python ]

```
import time
from greengrasssdk.stream_manager import (
    ReadMessagesOptions,
    Status,
    StatusConfig,
    StatusLevel,
    StatusMessage,
    StreamManagerClient,
)
from greengrasssdk.stream_manager.util import Util

client = StreamManagerClient()
 
try:
    # Read the statuses from the export status stream
    is_file_uploaded_to_s3 = False
    while not is_file_uploaded_to_s3:
        try:
            messages_list = client.read_messages(
                "StatusStreamName", ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000)
            )
            for message in messages_list:
                # Deserialize the status message first.
                status_message = Util.deserialize_json_bytes_to_obj(message.payload, StatusMessage)

                # Check the status of the status message. If the status is "Success",
                # the file was successfully uploaded to S3.
                # If the status was either "Failure" or "Cancelled", the server was unable to upload the file to S3.
                # We will print the message for why the upload to S3 failed from the status message.
                # If the status was "InProgress", the status indicates that the server has started uploading
                # the S3 task.
                if status_message.status == Status.Success:
                    logger.info("Successfully uploaded file at path " + file_url + " to S3.")
                    is_file_uploaded_to_s3 = True
                elif status_message.status == Status.Failure or status_message.status == Status.Canceled:
                    logger.info(
                        "Unable to upload file at path " + file_url + " to S3. Message: " + status_message.message
                    )
                    is_file_uploaded_to_s3 = True
            time.sleep(5)
        except StreamManagerException:
            logger.exception("Exception while running")
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [read\$1messages](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.read_messages) \$1 [StatusMessage](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.StatusMessage)

------
#### [ Java ]

```
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient;
import com.amazonaws.greengrass.streammanager.client.utils.ValidateAndSerialize;
import com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions;
import com.amazonaws.greengrass.streammanager.model.Status;
import com.amazonaws.greengrass.streammanager.model.StatusConfig;
import com.amazonaws.greengrass.streammanager.model.StatusLevel;
import com.amazonaws.greengrass.streammanager.model.StatusMessage;

try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    try {
        boolean isS3UploadComplete = false;
        while (!isS3UploadComplete) {
            try {
                // Read the statuses from the export status stream
                List<Message> messages = client.readMessages("StatusStreamName",
                    new ReadMessagesOptions().withMinMessageCount(1L).withReadTimeoutMillis(1000L));
                for (Message message : messages) {
                    // Deserialize the status message first.
                    StatusMessage statusMessage = ValidateAndSerialize.deserializeJsonBytesToObj(message.getPayload(), StatusMessage.class);
                    // Check the status of the status message. If the status is "Success", the file was successfully uploaded to S3.
                    // If the status was either "Failure" or "Canceled", the server was unable to upload the file to S3.
                    // We will print the message for why the upload to S3 failed from the status message.
                    // If the status was "InProgress", the status indicates that the server has started uploading the S3 task.
                    if (Status.Success.equals(statusMessage.getStatus())) {
                        System.out.println("Successfully uploaded file at path " + FILE_URL + " to S3.");
                        isS3UploadComplete = true;
                     } else if (Status.Failure.equals(statusMessage.getStatus()) || Status.Canceled.equals(statusMessage.getStatus())) {
                        System.out.println(String.format("Unable to upload file at path %s to S3. Message %s",
                            statusMessage.getStatusContext().getS3ExportTaskDefinition().getInputUrl(),
                            statusMessage.getMessage()));
                        sS3UploadComplete = true;
                    }
                }
            } catch (StreamManagerException ignored) {
            } finally {
                // Sleep for sometime for the S3 upload task to complete before trying to read the status message.
                Thread.sleep(5000);
            }
        } catch (e) {
        // Properly handle errors.
    }
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [readMessages](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#readMessages-java.lang.String-com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions-) \$1 [StatusMessage](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/StatusMessage.html)

------
#### [ Node.js ]

```
const {
    StreamManagerClient, ReadMessagesOptions,
    Status, StatusConfig, StatusLevel, StatusMessage,
    util,
} = require('aws-greengrass-core-sdk').StreamManager;

const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        let isS3UploadComplete = false;
        while (!isS3UploadComplete) {
            try {
                // Read the statuses from the export status stream
                const messages = await c.readMessages("StatusStreamName",
                    new ReadMessagesOptions()
                        .withMinMessageCount(1)
                        .withReadTimeoutMillis(1000));

                messages.forEach((message) => {
                    // Deserialize the status message first.
                    const statusMessage = util.deserializeJsonBytesToObj(message.payload, StatusMessage);
                    // Check the status of the status message. If the status is 'Success', the file was successfully uploaded to S3.
                    // If the status was either 'Failure' or 'Cancelled', the server was unable to upload the file to S3.
                    // We will print the message for why the upload to S3 failed from the status message.
                    // If the status was "InProgress", the status indicates that the server has started uploading the S3 task.
                    if (statusMessage.status === Status.Success) {
                        console.log(`Successfully uploaded file at path ${FILE_URL} to S3.`);
                        isS3UploadComplete = true;
                    } else if (statusMessage.status === Status.Failure || statusMessage.status === Status.Canceled) {
                        console.log(`Unable to upload file at path ${FILE_URL} to S3. Message: ${statusMessage.message}`);
                        isS3UploadComplete = true;
                    }
                });
                // Sleep for sometime for the S3 upload task to complete before trying to read the status message.
                await new Promise((r) => setTimeout(r, 5000));
            } catch (e) {
                // Ignored
            }
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [readMessages](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#readMessages) \$1 [StatusMessage](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StatusMessage.html)

------

# Export data streams to the AWS Cloud (console)
<a name="stream-manager-console"></a>

This tutorial shows you how to use the AWS IoT console to configure and deploy an AWS IoT Greengrass group with stream manager enabled. The group contains a user-defined Lambda function that writes to a stream in stream manager, which is then exported automatically to the AWS Cloud.

Stream manager makes ingesting, processing, and exporting high-volume data streams more efficient and reliable. In this tutorial, you create a `TransferStream` Lambda function that consumes IoT data. The Lambda function uses the AWS IoT Greengrass Core SDK to create a stream in stream manager and then read and write to it. Stream manager then exports the stream to Kinesis Data Streams. The following diagram shows this workflow.

![\[Diagram of the stream management workflow.\]](http://docs.aws.amazon.com/greengrass/v1/developerguide/images/stream-manager-scenario.png)


The focus of this tutorial is to show how user-defined Lambda functions use the `StreamManagerClient` object in the AWS IoT Greengrass Core SDK to interact with stream manager. For simplicity, the Python Lambda function that you create for this tutorial generates simulated device data.

## Prerequisites
<a name="stream-manager-console-prerequisites"></a>

To complete this tutorial, you need:<a name="stream-manager-howto-prereqs"></a>
+ A Greengrass group and a Greengrass core (v1.10 or later). For information about how to create a Greengrass group and core, see [Getting started with AWS IoT Greengrass](gg-gs.md). The Getting Started tutorial also includes steps for installing the AWS IoT Greengrass Core software.
**Note**  <a name="stream-manager-not-supported-openwrt"></a>
<a name="stream-manager-not-supported-openwrt-para"></a>Stream manager is not supported on OpenWrt distributions.
+ The Java 8 runtime (JDK 8) installed on the core device.<a name="install-java8-runtime-general"></a>
  + For Debian-based distributions (including Raspbian) or Ubuntu-based distributions, run the following command:

    ```
    sudo apt install openjdk-8-jdk
    ```
  + For Red Hat-based distributions (including Amazon Linux), run the following command:

    ```
    sudo yum install java-1.8.0-openjdk
    ```

    For more information, see [ How to download and install prebuilt OpenJDK packages](https://openjdk.java.net/install/) in the OpenJDK documentation.
+ AWS IoT Greengrass Core SDK for Python v1.5.0 or later. To use `StreamManagerClient` in the AWS IoT Greengrass Core SDK for Python, you must:
  + Install Python 3.7 or later on the core device.
  + Include the SDK and its dependencies in your Lambda function deployment package. Instructions are provided in this tutorial.
**Tip**  
You can use `StreamManagerClient` with Java or NodeJS. For example code, see the [AWS IoT Greengrass Core SDK for Java](https://github.com/aws/aws-greengrass-core-sdk-java/blob/master/samples/StreamManagerKinesis/src/main/java/com/amazonaws/greengrass/examples/StreamManagerKinesis.java) and [AWS IoT Greengrass Core SDK for Node.js](https://github.com/aws/aws-greengrass-core-sdk-js/blob/master/greengrassExamples/StreamManagerKinesis/index.js) on GitHub.
+ A destination stream named **MyKinesisStream** created in Amazon Kinesis Data Streams in the same AWS Region as your Greengrass group. For more information, see [Create a stream](https://docs.aws.amazon.com/streams/latest/dev/fundamental-stream.html#create-stream) in the *Amazon Kinesis Developer Guide*.
**Note**  
In this tutorial, stream manager exports data to Kinesis Data Streams, which results in charges to your AWS account. For information about pricing, see [Kinesis Data Streams pricing](https://aws.amazon.com/kinesis/data-streams/pricing/).  
To avoid incurring charges, you can run this tutorial without creating a Kinesis data stream. In this case, you check the logs to see that stream manager attempted to export the stream to Kinesis Data Streams.
+ An IAM policy added to the [Greengrass group role](group-role.md) that allows the `kinesis:PutRecords` action on the target data stream, as shown in the following example:

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "kinesis:PutRecords"
              ],
              "Resource": [
              "arn:aws:kinesis:us-east-1:123456789012:stream/MyKinesisStream"
              ]
          }
      ]
  }
  ```

------

The tutorial contains the following high-level steps:

1. [Create a Lambda function deployment package](#stream-manager-console-create-deployment-package)

1. [Create a Lambda function](#stream-manager-console-create-function)

1. [Add a function to the group](#stream-manager-console-create-gg-function)

1. [Enable stream manager](#stream-manager-console-enable-stream-manager)

1. [Configure local logging](#stream-manager-console-configure-logging)

1. [Deploy the group](#stream-manager-console-create-deployment)

1. [Test the application](#stream-manager-console-test-application)

The tutorial should take about 20 minutes to complete.

## Step 1: Create a Lambda function deployment package
<a name="stream-manager-console-create-deployment-package"></a>

In this step, you create a Lambda function deployment package that contains Python function code and dependencies. You upload this package later when you create the Lambda function in AWS Lambda. The Lambda function uses the AWS IoT Greengrass Core SDK to create and interact with local streams.

**Note**  
 Your user-defined Lambda functions must use the [AWS IoT Greengrass Core SDK](lambda-functions.md#lambda-sdks-core) to interact with stream manager. For more information about requirements for the Greengrass stream manager, see [Greengrass stream manager requirements](stream-manager.md#stream-manager-requirements). 

1.  Download the [AWS IoT Greengrass Core SDK for Python](lambda-functions.md#lambda-sdks-core) v1.5.0 or later.

1. <a name="unzip-ggc-sdk"></a>Unzip the downloaded package to get the SDK. The SDK is the `greengrasssdk` folder.

1. <a name="install-python-sdk-dependencies-stream-manager"></a>Install package dependencies to include with the SDK in your Lambda function deployment package.<a name="python-sdk-dependencies-stream-manager"></a>

   1. Navigate to the SDK directory that contains the `requirements.txt` file. This file lists the dependencies.

   1. Install the SDK dependencies. For example, run the following `pip` command to install them in the current directory:

      ```
      pip install --target . -r requirements.txt
      ```

1. Save the following Python code function in a local file named `transfer_stream.py`.
**Tip**  
 For example code that uses Java and NodeJS, see the [AWS IoT Greengrass Core SDK for Java](https://github.com/aws/aws-greengrass-core-sdk-java/blob/master/samples/StreamManagerKinesis/src/main/java/com/amazonaws/greengrass/examples/StreamManagerKinesis.java) and [AWS IoT Greengrass Core SDK for Node.js](https://github.com/aws/aws-greengrass-core-sdk-js/blob/master/greengrassExamples/StreamManagerKinesis/index.js) on GitHub.

   ```
   import asyncio
   import logging
   import random
   import time
   
   from greengrasssdk.stream_manager import (
       ExportDefinition,
       KinesisConfig,
       MessageStreamDefinition,
       ReadMessagesOptions,
       ResourceNotFoundException,
       StrategyOnFull,
       StreamManagerClient,
   )
   
   
   # This example creates a local stream named "SomeStream".
   # It starts writing data into that stream and then stream manager automatically exports  
   # the data to a customer-created Kinesis data stream named "MyKinesisStream". 
   # This example runs forever until the program is stopped.
   
   # The size of the local stream on disk will not exceed the default (which is 256 MB).
   # Any data appended after the stream reaches the size limit continues to be appended, and
   # stream manager deletes the oldest data until the total stream size is back under 256 MB.
   # The Kinesis data stream in the cloud has no such bound, so all the data from this script is
   # uploaded to Kinesis and you will be charged for that usage.
   
   
   def main(logger):
       try:
           stream_name = "SomeStream"
           kinesis_stream_name = "MyKinesisStream"
   
           # Create a client for the StreamManager
           client = StreamManagerClient()
   
           # Try deleting the stream (if it exists) so that we have a fresh start
           try:
               client.delete_message_stream(stream_name=stream_name)
           except ResourceNotFoundException:
               pass
   
           exports = ExportDefinition(
               kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)]
           )
           client.create_message_stream(
               MessageStreamDefinition(
                   name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports
               )
           )
   
           # Append two messages and print their sequence numbers
           logger.info(
               "Successfully appended message to stream with sequence number %d",
               client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")),
           )
           logger.info(
               "Successfully appended message to stream with sequence number %d",
               client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")),
           )
   
           # Try reading the two messages we just appended and print them out
           logger.info(
               "Successfully read 2 messages: %s",
               client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)),
           )
   
           logger.info("Now going to start writing random integers between 0 and 1000 to the stream")
           # Now start putting in random data between 0 and 1000 to emulate device sensor input
           while True:
               logger.debug("Appending new random integer to stream")
               client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big"))
               time.sleep(1)
   
       except asyncio.TimeoutError:
           logger.exception("Timed out while executing")
       except Exception:
           logger.exception("Exception while running")
   
   
   def function_handler(event, context):
       return
   
   
   logging.basicConfig(level=logging.INFO)
   # Start up this sample code
   main(logger=logging.getLogger())
   ```

1. Zip the following items into a file named `transfer_stream_python.zip`. This is your Lambda function deployment package.
   + **transfer\$1stream.py**. App logic.
   + **greengrasssdk**. Required library for Python Greengrass Lambda functions that publish MQTT messages.

     [Stream manager operations](work-with-streams.md) are available in version 1.5.0 or later of the AWS IoT Greengrass Core SDK for Python.
   + The dependencies you installed for the AWS IoT Greengrass Core SDK for Python (for example, the `cbor2` directories).

   When you create the `zip` file, include only these items, not the containing folder.

## Step 2: Create a Lambda function
<a name="stream-manager-console-create-function"></a>

In this step, you use the AWS Lambda console to create a Lambda function and configure it to use your deployment package. Then, you publish a function version and create an alias.

1. First, create the Lambda function.

   1. <a name="lambda-console-open"></a>In the AWS Management Console, choose **Services**, and open the AWS Lambda console.

   1. <a name="lambda-console-create-function"></a>Choose **Create function** and then choose **Author from scratch**.

   1. In the **Basic information** section, use the following values:
      + For **Function name**, enter **TransferStream**.
      + For **Runtime**, choose **Python 3.7**.
      + For **Permissions**, keep the default setting. This creates an execution role that grants basic Lambda permissions. This role isn't used by AWS IoT Greengrass.

   1. <a name="lambda-console-save-function"></a>At the bottom of the page, choose **Create function**.

1. Next, register the handler and upload your Lambda function deployment package.

   1. <a name="lambda-console-upload"></a>On the **Code** tab, under **Code source**, choose **Upload from**. From the dropdown, choose **.zip file**.  
![\[The Upload from dropdown with .zip file highlighted.\]](http://docs.aws.amazon.com/greengrass/v1/developerguide/images/lra-console/upload-deployment-package.png)

   1. Choose **Upload**, and then choose your `transfer_stream_python.zip` deployment package. Then, choose **Save**.

   1. <a name="lambda-console-runtime-settings-para"></a>On the **Code** tab for the function, under **Runtime settings**, choose **Edit**, and then enter the following values.
      + For **Runtime**, choose **Python 3.7**.
      + For **Handler**, enter **transfer\$1stream.function\$1handler**

   1. <a name="lambda-console-save-config"></a>Choose **Save**.
**Note**  
The **Test** button on the AWS Lambda console doesn't work with this function. The AWS IoT Greengrass Core SDK doesn't contain modules that are required to run your Greengrass Lambda functions independently in the AWS Lambda console. These modules (for example, `greengrass_common`) are supplied to the functions after they are deployed to your Greengrass core.

1. Now, publish the first version of your Lambda function and create an [alias for the version](https://docs.aws.amazon.com/lambda/latest/dg/versioning-aliases.html).
**Note**  
Greengrass groups can reference a Lambda function by alias (recommended) or by version. Using an alias makes it easier to manage code updates because you don't have to change your subscription table or group definition when the function code is updated. Instead, you just point the alias to the new function version.

   1. <a name="shared-publish-function-version"></a>From the **Actions** menu, choose **Publish new version**.

   1. <a name="shared-publish-function-version-description"></a>For **Version description**, enter **First version**, and then choose **Publish**.

   1. On the **TransferStream: 1** configuration page, from the **Actions** menu, choose **Create alias**.

   1. On the **Create a new alias** page, use the following values:
      + For **Name**, enter **GG\$1TransferStream**.
      + For **Version**, choose **1**.
**Note**  
AWS IoT Greengrass doesn't support Lambda aliases for **\$1LATEST** versions.

   1. Choose **Create**.

Now you're ready to add the Lambda function to your Greengrass group.

## Step 3: Add a Lambda function to the Greengrass group
<a name="stream-manager-console-create-gg-function"></a>

In this step, you add the Lambda function to the group and then configure its lifecycle and environment variables. For more information, see [Controlling execution of Greengrass Lambda functions by using group-specific configuration](lambda-group-config.md).

1. <a name="console-gg-groups"></a>In the AWS IoT console navigation pane, under **Manage**, expand **Greengrass devices**, and then choose **Groups (V1)**.

1. <a name="group-choose-target-group"></a>Choose the target group.

1. <a name="choose-add-lambda"></a>On the group configuration page, choose the **Lambda functions** tab.

1. Under **My Lambda functions**, choose **Add**.

1. On the **Add Lambda function** page, choose the **Lambda function** for your Lambda function.

1. For the **Lambda version**, choose **Alias:GG\$1TransferStream**.

   Now, configure properties that determine the behavior of the Lambda function in the Greengrass group.

1. In the **Lambda function configuration** section, make the following changes:
   + Set **Memory limit** to 32 MB.
   + For **Pinned**, choose **True**.
**Note**  
<a name="long-lived-lambda"></a>A *long-lived* (or *pinned*) Lambda function starts automatically after AWS IoT Greengrass starts and keeps running in its own container. This is in contrast to an *on-demand* Lambda function, which starts when invoked and stops when there are no tasks left to run. For more information, see [Lifecycle configuration for Greengrass Lambda functions](lambda-functions.md#lambda-lifecycle).

1. Choose **Add Lambda function**.

## Step 4: Enable stream manager
<a name="stream-manager-console-enable-stream-manager"></a>

In this step, you make sure that stream manager is enabled.

1. On the group configuration page, choose the **Lambda functions** tab.

1. Under **System Lambda functions**, select **Stream manager**, and check the status. If disabled, choose **Edit**. Then, choose **Enable** and **Save**. You can use the default parameter settings for this tutorial. For more information, see [Configure AWS IoT Greengrass stream manager](configure-stream-manager.md).

**Note**  <a name="ggstreammanager-function-config-console"></a>
When you use the console to enable stream manager and deploy the group, the memory size for stream manager is set to 4194304 KB (4 GB) by default. We recommend that you set the memory size to at least 128000 KB.

## Step 5: Configure local logging
<a name="stream-manager-console-configure-logging"></a>

In this step, you configure AWS IoT Greengrass system components, user-defined Lambda functions, and connectors in the group to write logs to the file system of the core device. You can use logs to troubleshoot any issues you might encounter. For more information, see [Monitoring with AWS IoT Greengrass logs](greengrass-logs-overview.md).

1. <a name="shared-group-settings-local-logs-configuration"></a>Under **Local logs configuration**, check if local logging is configured.

1. <a name="shared-group-settings-local-logs-edit"></a>If logs aren't configured for Greengrass system components or user-defined Lambda functions, choose **Edit**.

1. <a name="shared-group-settings-local-logs-event-source"></a>Choose **User Lambda functions log level** and **Greengrass system log level**.

1. <a name="shared-group-settings-local-logs-save"></a>Keep the default values for logging level and disk space limit, and then choose **Save**.

## Step 6: Deploy the Greengrass group
<a name="stream-manager-console-create-deployment"></a>

Deploy the group to the core device.

1. <a name="shared-deploy-group-checkggc"></a>Make sure that the AWS IoT Greengrass core is running. Run the following commands in your Raspberry Pi terminal, as needed.

   1. To check whether the daemon is running:

      ```
      ps aux | grep -E 'greengrass.*daemon'
      ```

      If the output contains a `root` entry for `/greengrass/ggc/packages/ggc-version/bin/daemon`, then the daemon is running.
**Note**  
The version in the path depends on the AWS IoT Greengrass Core software version that's installed on your core device.

   1. To start the daemon:

      ```
      cd /greengrass/ggc/core/
      sudo ./greengrassd start
      ```

1. <a name="shared-deploy-group-deploy"></a>On the group configuration page, choose **Deploy**.

1. <a name="shared-deploy-group-ipconfig"></a>

   1. In the **Lambda functions** tab, under the **System Lambda functions** section, select **IP detector** and choose **Edit**.

   1. In the **Edit IP detector settings** dialog box, select ** Automatically detect and override MQTT broker endpoints**.

   1. Choose **Save**.

      This enables devices to automatically acquire connectivity information for the core, such as IP address, DNS, and port number. Automatic detection is recommended, but AWS IoT Greengrass also supports manually specified endpoints. You're only prompted for the discovery method the first time that the group is deployed.
**Note**  
If prompted, grant permission to create the [Greengrass service role](service-role.md) and associate it with your AWS account in the current AWS Region. This role allows AWS IoT Greengrass to access your resources in AWS services.

      The **Deployments** page shows the deployment timestamp, version ID, and status. When completed, the status displayed for the deployment should be **Completed**.

      For troubleshooting help, see [Troubleshooting AWS IoT Greengrass](gg-troubleshooting.md).

## Step 7: Test the application
<a name="stream-manager-console-test-application"></a>

The `TransferStream` Lambda function generates simulated device data. It writes data to a stream that stream manager exports to the target Kinesis data stream.

1. <a name="stream-manager-howto-test-open-kinesis-console"></a>In the Amazon Kinesis console, under **Kinesis data streams**, choose **MyKinesisStream**.
**Note**  
If you ran the tutorial without a target Kinesis data stream, [check the log file](stream-manager-cli.md#stream-manager-cli-logs) for the stream manager (`GGStreamManager`). If it contains `export stream MyKinesisStream doesn't exist` in an error message, then the test is successful. This error means that the service tried to export to the stream but the stream doesn't exist.

1. <a name="stream-manager-howto-view-put-records"></a>On the **MyKinesisStream** page, choose **Monitoring**. If the test is successful, you should see data in the **Put Records** charts. Depending on your connection, it might take a minute before the data is displayed.
**Important**  
When you're finished testing, delete the Kinesis data stream to avoid incurring more charges.  
Or, run the following commands to stop the Greengrass daemon. This prevents the core from sending messages until you're ready to continue testing.  

   ```
   cd /greengrass/ggc/core/
   sudo ./greengrassd stop
   ```

1. Remove the **TransferStream** Lambda function from the core.

   1. <a name="console-gg-groups"></a>In the AWS IoT console navigation pane, under **Manage**, expand **Greengrass devices**, and then choose **Groups (V1)**.

   1. Under **Greengrass groups**, choose your group.

   1. On the **Lambdas** page, choose the ellipses (**…**) for the **TransferStream** function, and then choose **Remove function**.

   1. From **Actions**, choose **Deploy**.

To view logging information or troubleshoot issues with streams, check the logs for the `TransferStream` and `GGStreamManager` functions. You must have `root` permissions to read AWS IoT Greengrass logs on the file system.
+ `TransferStream` writes log entries to `greengrass-root/ggc/var/log/user/region/account-id/TransferStream.log`.
+ `GGStreamManager` writes log entries to `greengrass-root/ggc/var/log/system/GGStreamManager.log`.

If you need more troubleshooting information, you can [set the logging level](#stream-manager-console-configure-logging) for **User Lambda logs** to **Debug logs** and then deploy the group again.

## See also
<a name="stream-manager-console-see-also"></a>
+ [Manage data streams on the AWS IoT Greengrass core](stream-manager.md)
+ [Configure AWS IoT Greengrass stream manager](configure-stream-manager.md)
+ [Use StreamManagerClient to work with streams](work-with-streams.md)
+ [Export configurations for supported AWS Cloud destinations](stream-export-configurations.md)
+ [Export data streams to the AWS Cloud (CLI)](stream-manager-cli.md)

# Export data streams to the AWS Cloud (CLI)
<a name="stream-manager-cli"></a>

This tutorial shows you how to use the AWS CLI to configure and deploy an AWS IoT Greengrass group with stream manager enabled. The group contains a user-defined Lambda function that writes to a stream in stream manager, which is then exported automatically to the AWS Cloud.

Stream manager makes ingesting, processing, and exporting high-volume data streams more efficient and reliable. In this tutorial, you create a `TransferStream` Lambda function that consumes IoT data. The Lambda function uses the AWS IoT Greengrass Core SDK to create a stream in stream manager and then read and write to it. Stream manager then exports the stream to Kinesis Data Streams. The following diagram shows this workflow.

![\[Diagram of the stream management workflow.\]](http://docs.aws.amazon.com/greengrass/v1/developerguide/images/stream-manager-scenario.png)


The focus of this tutorial is to show how user-defined Lambda functions use the `StreamManagerClient` object in the AWS IoT Greengrass Core SDK to interact with stream manager. For simplicity, the Python Lambda function that you create for this tutorial generates simulated device data.

When you use the AWS IoT Greengrass API, which includes the Greengrass commands in the AWS CLI, to create a group, stream manager is disabled by default. To enable stream manager on your core, you [create a function definition version](#stream-manager-cli-create-function-definition) that includes the system `GGStreamManager` Lambda function and a group version that references the new function definition version. Then you deploy the group.

## Prerequisites
<a name="stream-manager-cli-prerequisites"></a>

To complete this tutorial, you need:<a name="stream-manager-howto-prereqs"></a>
+ A Greengrass group and a Greengrass core (v1.10 or later). For information about how to create a Greengrass group and core, see [Getting started with AWS IoT Greengrass](gg-gs.md). The Getting Started tutorial also includes steps for installing the AWS IoT Greengrass Core software.
**Note**  <a name="stream-manager-not-supported-openwrt"></a>
<a name="stream-manager-not-supported-openwrt-para"></a>Stream manager is not supported on OpenWrt distributions.
+ The Java 8 runtime (JDK 8) installed on the core device.<a name="install-java8-runtime-general"></a>
  + For Debian-based distributions (including Raspbian) or Ubuntu-based distributions, run the following command:

    ```
    sudo apt install openjdk-8-jdk
    ```
  + For Red Hat-based distributions (including Amazon Linux), run the following command:

    ```
    sudo yum install java-1.8.0-openjdk
    ```

    For more information, see [ How to download and install prebuilt OpenJDK packages](https://openjdk.java.net/install/) in the OpenJDK documentation.
+ AWS IoT Greengrass Core SDK for Python v1.5.0 or later. To use `StreamManagerClient` in the AWS IoT Greengrass Core SDK for Python, you must:
  + Install Python 3.7 or later on the core device.
  + Include the SDK and its dependencies in your Lambda function deployment package. Instructions are provided in this tutorial.
**Tip**  
You can use `StreamManagerClient` with Java or NodeJS. For example code, see the [AWS IoT Greengrass Core SDK for Java](https://github.com/aws/aws-greengrass-core-sdk-java/blob/master/samples/StreamManagerKinesis/src/main/java/com/amazonaws/greengrass/examples/StreamManagerKinesis.java) and [AWS IoT Greengrass Core SDK for Node.js](https://github.com/aws/aws-greengrass-core-sdk-js/blob/master/greengrassExamples/StreamManagerKinesis/index.js) on GitHub.
+ A destination stream named **MyKinesisStream** created in Amazon Kinesis Data Streams in the same AWS Region as your Greengrass group. For more information, see [Create a stream](https://docs.aws.amazon.com/streams/latest/dev/fundamental-stream.html#create-stream) in the *Amazon Kinesis Developer Guide*.
**Note**  
In this tutorial, stream manager exports data to Kinesis Data Streams, which results in charges to your AWS account. For information about pricing, see [Kinesis Data Streams pricing](https://aws.amazon.com/kinesis/data-streams/pricing/).  
To avoid incurring charges, you can run this tutorial without creating a Kinesis data stream. In this case, you check the logs to see that stream manager attempted to export the stream to Kinesis Data Streams.
+ An IAM policy added to the [Greengrass group role](group-role.md) that allows the `kinesis:PutRecords` action on the target data stream, as shown in the following example:

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "kinesis:PutRecords"
              ],
              "Resource": [
              "arn:aws:kinesis:us-east-1:123456789012:stream/MyKinesisStream"
              ]
          }
      ]
  }
  ```

------<a name="aws-cli-howto-prereqs"></a>
+ The AWS CLI installed and configured on your computer. For more information, see [ Installing the AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/installing.html) and [Configuring the AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html) in the *AWS Command Line Interface User Guide*.

   

  The example commands in this tutorial are written for Linux and other Unix-based systems. If you're using Windows, see [Specifying parameter values for the AWS command line interface](https://docs.aws.amazon.com/cli/latest/userguide/cli-using-param.html) for more information about differences in syntax.

  If the command contains a JSON string, the tutorial provides an example that has the JSON on a single line. On some systems, it might be more efficient to edit and run commands using this format.

 

The tutorial contains the following high-level steps:

1. [Create a Lambda function deployment package](#stream-manager-cli-create-deployment-package)

1. [Create a Lambda function](#stream-manager-cli-create-function)

1. [Create a function definition and version](#stream-manager-cli-create-function-definition)

1. [Create a logger definition and version](#stream-manager-cli-create-logger-definition)

1. [Get the ARN of your core definition version](#stream-manager-cli-get-core-definition-version-arn)

1. [Create a group version](#stream-manager-cli-create-group-version)

1. [Create a deployment](#stream-manager-cli-create-deployment)

1. [Test the application](#stream-manager-cli-test-application)

The tutorial should take about 30 minutes to complete.

## Step 1: Create a Lambda function deployment package
<a name="stream-manager-cli-create-deployment-package"></a>

In this step, you create a Lambda function deployment package that contains Python function code and dependencies. You upload this package later when you create the Lambda function in AWS Lambda. The Lambda function uses the AWS IoT Greengrass Core SDK to create and interact with local streams.

**Note**  
 Your user-defined Lambda functions must use the [AWS IoT Greengrass Core SDK](lambda-functions.md#lambda-sdks-core) to interact with stream manager. For more information about requirements for the Greengrass stream manager, see [Greengrass stream manager requirements](stream-manager.md#stream-manager-requirements). 

1.  Download the [AWS IoT Greengrass Core SDK for Python](lambda-functions.md#lambda-sdks-core) v1.5.0 or later.

1. <a name="unzip-ggc-sdk"></a>Unzip the downloaded package to get the SDK. The SDK is the `greengrasssdk` folder.

1. <a name="install-python-sdk-dependencies-stream-manager"></a>Install package dependencies to include with the SDK in your Lambda function deployment package.<a name="python-sdk-dependencies-stream-manager"></a>

   1. Navigate to the SDK directory that contains the `requirements.txt` file. This file lists the dependencies.

   1. Install the SDK dependencies. For example, run the following `pip` command to install them in the current directory:

      ```
      pip install --target . -r requirements.txt
      ```

1. Save the following Python code function in a local file named `transfer_stream.py`.
**Tip**  
 For example code that uses Java and NodeJS, see the [AWS IoT Greengrass Core SDK for Java](https://github.com/aws/aws-greengrass-core-sdk-java/blob/master/samples/StreamManagerKinesis/src/main/java/com/amazonaws/greengrass/examples/StreamManagerKinesis.java) and [AWS IoT Greengrass Core SDK for Node.js](https://github.com/aws/aws-greengrass-core-sdk-js/blob/master/greengrassExamples/StreamManagerKinesis/index.js) on GitHub.

   ```
   import asyncio
   import logging
   import random
   import time
   
   from greengrasssdk.stream_manager import (
       ExportDefinition,
       KinesisConfig,
       MessageStreamDefinition,
       ReadMessagesOptions,
       ResourceNotFoundException,
       StrategyOnFull,
       StreamManagerClient,
   )
   
   
   # This example creates a local stream named "SomeStream".
   # It starts writing data into that stream and then stream manager automatically exports  
   # the data to a customer-created Kinesis data stream named "MyKinesisStream". 
   # This example runs forever until the program is stopped.
   
   # The size of the local stream on disk will not exceed the default (which is 256 MB).
   # Any data appended after the stream reaches the size limit continues to be appended, and
   # stream manager deletes the oldest data until the total stream size is back under 256 MB.
   # The Kinesis data stream in the cloud has no such bound, so all the data from this script is
   # uploaded to Kinesis and you will be charged for that usage.
   
   
   def main(logger):
       try:
           stream_name = "SomeStream"
           kinesis_stream_name = "MyKinesisStream"
   
           # Create a client for the StreamManager
           client = StreamManagerClient()
   
           # Try deleting the stream (if it exists) so that we have a fresh start
           try:
               client.delete_message_stream(stream_name=stream_name)
           except ResourceNotFoundException:
               pass
   
           exports = ExportDefinition(
               kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)]
           )
           client.create_message_stream(
               MessageStreamDefinition(
                   name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports
               )
           )
   
           # Append two messages and print their sequence numbers
           logger.info(
               "Successfully appended message to stream with sequence number %d",
               client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")),
           )
           logger.info(
               "Successfully appended message to stream with sequence number %d",
               client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")),
           )
   
           # Try reading the two messages we just appended and print them out
           logger.info(
               "Successfully read 2 messages: %s",
               client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)),
           )
   
           logger.info("Now going to start writing random integers between 0 and 1000 to the stream")
           # Now start putting in random data between 0 and 1000 to emulate device sensor input
           while True:
               logger.debug("Appending new random integer to stream")
               client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big"))
               time.sleep(1)
   
       except asyncio.TimeoutError:
           logger.exception("Timed out while executing")
       except Exception:
           logger.exception("Exception while running")
   
   
   def function_handler(event, context):
       return
   
   
   logging.basicConfig(level=logging.INFO)
   # Start up this sample code
   main(logger=logging.getLogger())
   ```

1. Zip the following items into a file named `transfer_stream_python.zip`. This is your Lambda function deployment package.
   + **transfer\$1stream.py**. App logic.
   + **greengrasssdk**. Required library for Python Greengrass Lambda functions that publish MQTT messages.

     [Stream manager operations](work-with-streams.md) are available in version 1.5.0 or later of the AWS IoT Greengrass Core SDK for Python.
   + The dependencies you installed for the AWS IoT Greengrass Core SDK for Python (for example, the `cbor2` directories).

   When you create the `zip` file, include only these items, not the containing folder.

## Step 2: Create a Lambda function
<a name="stream-manager-cli-create-function"></a>

1. <a name="cli-create-empty-lambda-role"></a>Create an IAM role so you can pass in the role ARN when you create the function.

------
#### [ JSON Expanded ]

   ```
   aws iam create-role --role-name Lambda_empty --assume-role-policy '{
       "Version": "2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Principal": {
                   "Service": "lambda.amazonaws.com"
               },
              "Action": "sts:AssumeRole"
           }
       ]
   }'
   ```

------
#### [ JSON Single-line ]

   ```
   aws iam create-role --role-name Lambda_empty --assume-role-policy '{"Version": "2012-10-17",		 	 	  "Statement": [{"Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com"},"Action": "sts:AssumeRole"}]}'
   ```

------
**Note**  
AWS IoT Greengrass doesn't use this role because permissions for your Greengrass Lambda functions are specified in the Greengrass group role. For this tutorial, you create an empty role.

1. <a name="cli-copy-lambda-role-arn"></a>Copy the `Arn` from the output.

1. Use the AWS Lambda API to create the `TransferStream` function. The following command assumes that the zip file is in the current directory.
   + Replace *role-arn* with the `Arn` that you copied.

   ```
   aws lambda create-function \
   --function-name TransferStream \
   --zip-file fileb://transfer_stream_python.zip \
   --role role-arn \
   --handler transfer_stream.function_handler \
   --runtime python3.7
   ```

1. Publish a version of the function.

   ```
   aws lambda publish-version --function-name TransferStream --description 'First version'
   ```

1. Create an alias for the published version.

   Greengrass groups can reference a Lambda function by alias (recommended) or by version. Using an alias makes it easier to manage code updates because you don't have to change your subscription table or group definition when the function code is updated. Instead, you just point the alias to the new function version.

   ```
   aws lambda create-alias --function-name TransferStream --name GG_TransferStream --function-version 1
   ```
**Note**  
AWS IoT Greengrass doesn't support Lambda aliases for **\$1LATEST** versions.

1. Copy the `AliasArn` from the output. You use this value when you configure the function for AWS IoT Greengrass.

Now you're ready to configure the function for AWS IoT Greengrass.

## Step 3: Create a function definition and version
<a name="stream-manager-cli-create-function-definition"></a>

This step creates a function definition version that references the system `GGStreamManager` Lambda function and your user-defined `TransferStream` Lambda function. To enable stream manager when you use the AWS IoT Greengrass API, your function definition version must include the `GGStreamManager` function.

1. Create a function definition with an initial version that contains the system and user-defined Lambda functions.

   The following definition version enables stream manager with default [parameter settings](configure-stream-manager.md). To configure custom settings, you must define environment variables for corresponding stream manager parameters. For an example, see [To enable, disable, or configure stream manager (CLI)](configure-stream-manager.md#enable-stream-manager-cli). AWS IoT Greengrass uses default settings for parameters that are omitted. `MemorySize` should be at least `128000`. `Pinned` must be set to `true`.
**Note**  
<a name="long-lived-lambda"></a>A *long-lived* (or *pinned*) Lambda function starts automatically after AWS IoT Greengrass starts and keeps running in its own container. This is in contrast to an *on-demand* Lambda function, which starts when invoked and stops when there are no tasks left to run. For more information, see [Lifecycle configuration for Greengrass Lambda functions](lambda-functions.md#lambda-lifecycle).
   + Replace *arbitrary-function-id* with a name for the function, such as **stream-manager**.
   + Replace *alias-arn* with the `AliasArn` that you copied when you created the alias for the `TransferStream` Lambda function.

    

------
#### [ JSON expanded ]

   ```
   aws greengrass create-function-definition --name MyGreengrassFunctions --initial-version '{
       "Functions": [
           {
               "Id": "arbitrary-function-id",
               "FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1", 
               "FunctionConfiguration": {
                   "MemorySize": 128000,
                   "Pinned": true,
                   "Timeout": 3
               }
           },
           {
               "Id": "TransferStreamFunction",
               "FunctionArn": "alias-arn",
               "FunctionConfiguration": {
                   "Executable": "transfer_stream.function_handler",
                   "MemorySize": 16000,
                   "Pinned": true,
                   "Timeout": 5
               }
           }
       ]
   }'
   ```

------
#### [ JSON single ]

   ```
   aws greengrass create-function-definition \
   --name MyGreengrassFunctions \
   --initial-version '{"Functions": [{"Id": "arbitrary-function-id","FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1", "FunctionConfiguration": {"Environment": {"Variables":{"STREAM_MANAGER_STORE_ROOT_DIR": "/data","STREAM_MANAGER_SERVER_PORT": "1234","STREAM_MANAGER_EXPORTER_MAX_BANDWIDTH": "20000"}},"MemorySize": 128000,"Pinned": true,"Timeout": 3}},{"Id": "TransferStreamFunction", "FunctionArn": "alias-arn", "FunctionConfiguration": {"Executable": "transfer_stream.function_handler", "MemorySize": 16000,"Pinned": true,"Timeout": 5}}]}'
   ```

------
**Note**  
`Timeout` is required by the function definition version, but `GGStreamManager` doesn't use it. For more information about `Timeout` and other group-level settings, see [Controlling execution of Greengrass Lambda functions by using group-specific configuration](lambda-group-config.md).

1. Copy the `LatestVersionArn` from the output. You use this value to add the function definition version to the group version that you deploy to the core.

## Step 4: Create a logger definition and version
<a name="stream-manager-cli-create-logger-definition"></a>

Configure the group's logging settings. For this tutorial, you configure AWS IoT Greengrass system components, user-defined Lambda functions, and connectors to write logs to the file system of the core device. You can use logs to troubleshoot any issues you might encounter. For more information, see [Monitoring with AWS IoT Greengrass logs](greengrass-logs-overview.md).

1. <a name="create-logger-definition"></a>Create a logger definition that includes an initial version.

------
#### [ JSON Expanded ]

   ```
   aws greengrass create-logger-definition --name "LoggingConfigs" --initial-version '{
       "Loggers": [
           {
               "Id": "1",
               "Component": "GreengrassSystem",
               "Level": "INFO",
               "Space": 10240,
               "Type": "FileSystem"
           },
           {
               "Id": "2",
               "Component": "Lambda",
               "Level": "INFO",
               "Space": 10240,
               "Type": "FileSystem"
           }
       ]
   }'
   ```

------
#### [ JSON Single-line ]

   ```
   aws greengrass create-logger-definition \
       --name "LoggingConfigs" \
       --initial-version '{"Loggers":[{"Id":"1","Component":"GreengrassSystem","Level":"INFO","Space":10240,"Type":"FileSystem"},{"Id":"2","Component":"Lambda","Level":"INFO","Space":10240,"Type":"FileSystem"}]}'
   ```

------

1. <a name="copy-logger-definition-version-id"></a>Copy the `LatestVersionArn` of the logger definition from the output. You use this value to add the logger definition version to the group version that you deploy to the core.

## Step 5: Get the ARN of your core definition version
<a name="stream-manager-cli-get-core-definition-version-arn"></a>

Get the ARN of the core definition version to add to your new group version. To deploy a group version, it must reference a core definition version that contains exactly one core.

1. <a name="get-group-id-latestversion"></a>Get the IDs of the target Greengrass group and group version. This procedure assumes that this is the latest group and group version. The following query returns the most recently created group.

   ```
   aws greengrass list-groups --query "reverse(sort_by(Groups, &CreationTimestamp))[0]"
   ```

   Or, you can query by name. Group names are not required to be unique, so multiple groups might be returned.

   ```
   aws greengrass list-groups --query "Groups[?Name=='MyGroup']"
   ```
**Note**  
<a name="find-group-ids-console"></a>You can also find these values in the AWS IoT console. The group ID is displayed on the group's **Settings** page. Group version IDs are displayed on the group's **Deployments** tab.

1. <a name="copy-target-group-id"></a>Copy the `Id` of the target group from the output. You use this to get the core definition version and when you deploy the group.

1. <a name="copy-latest-group-version-id"></a>Copy the `LatestVersion` from the output, which is the ID of the last version added to the group. You use this to get the core definition version.

1. Get the ARN of the core definition version:

   1. Get the group version.
      + Replace *group-id* with the `Id` that you copied for the group.
      + Replace *group-version-id* with the `LatestVersion` that you copied for the group.

      ```
      aws greengrass get-group-version \
      --group-id group-id \
      --group-version-id group-version-id
      ```

   1. Copy the `CoreDefinitionVersionArn` from the output. You use this value to add the core definition version to the group version that you deploy to the core.

## Step 6: Create a group version
<a name="stream-manager-cli-create-group-version"></a>

Now, you're ready to create a group version that contains the entities that you want to deploy. You do this by creating a group version that references the target version of each component type. For this tutorial, you include a core definition version, a function definition version, and a logger definition version.

1. Create a group version.
   + Replace *group-id* with the `Id` that you copied for the group.
   + Replace *core-definition-version-arn* with the `CoreDefinitionVersionArn` that you copied for the core definition version.
   + Replace *function-definition-version-arn* with the `LatestVersionArn` that you copied for your new function definition version.
   + Replace *logger-definition-version-arn* with the `LatestVersionArn` that you copied for your new logger definition version.

   ```
   aws greengrass create-group-version \
   --group-id group-id \
   --core-definition-version-arn core-definition-version-arn \
   --function-definition-version-arn function-definition-version-arn \
   --logger-definition-version-arn logger-definition-version-arn
   ```

1. <a name="copy-group-version-id"></a>Copy the `Version` from the output. This is the ID of the new group version.

## Step 7: Create a deployment
<a name="stream-manager-cli-create-deployment"></a>

Deploy the group to the core device.

1. <a name="shared-deploy-group-checkggc"></a>Make sure that the AWS IoT Greengrass core is running. Run the following commands in your Raspberry Pi terminal, as needed.

   1. To check whether the daemon is running:

      ```
      ps aux | grep -E 'greengrass.*daemon'
      ```

      If the output contains a `root` entry for `/greengrass/ggc/packages/ggc-version/bin/daemon`, then the daemon is running.
**Note**  
The version in the path depends on the AWS IoT Greengrass Core software version that's installed on your core device.

   1. To start the daemon:

      ```
      cd /greengrass/ggc/core/
      sudo ./greengrassd start
      ```

1. <a name="create-deployment"></a>Create a deployment.
   + Replace *group-id* with the `Id` that you copied for the group.
   + Replace *group-version-id* with the `Version` that you copied for the new group version.

   ```
   aws greengrass create-deployment \
   --deployment-type NewDeployment \
   --group-id group-id \
   --group-version-id group-version-id
   ```

1. <a name="copy-deployment-id"></a>Copy the `DeploymentId` from the output.

1. <a name="get-deployment-status"></a>Get the deployment status.
   + Replace *group-id* with the `Id` that you copied for the group.
   + Replace *deployment-id* with the `DeploymentId` that you copied for the deployment.

   ```
   aws greengrass get-deployment-status \
   --group-id group-id \
   --deployment-id deployment-id
   ```

   If the status is `Success`, the deployment was successful. For troubleshooting help, see [Troubleshooting AWS IoT Greengrass](gg-troubleshooting.md).

## Step 8: Test the application
<a name="stream-manager-cli-test-application"></a>

The `TransferStream` Lambda function generates simulated device data. It writes data to a stream that stream manager exports to the target Kinesis data stream.

1. <a name="stream-manager-howto-test-open-kinesis-console"></a>In the Amazon Kinesis console, under **Kinesis data streams**, choose **MyKinesisStream**.
**Note**  
If you ran the tutorial without a target Kinesis data stream, [check the log file](#stream-manager-cli-logs) for the stream manager (`GGStreamManager`). If it contains `export stream MyKinesisStream doesn't exist` in an error message, then the test is successful. This error means that the service tried to export to the stream but the stream doesn't exist.

1. <a name="stream-manager-howto-view-put-records"></a>On the **MyKinesisStream** page, choose **Monitoring**. If the test is successful, you should see data in the **Put Records** charts. Depending on your connection, it might take a minute before the data is displayed.
**Important**  
When you're finished testing, delete the Kinesis data stream to avoid incurring more charges.  
Or, run the following commands to stop the Greengrass daemon. This prevents the core from sending messages until you're ready to continue testing.  

   ```
   cd /greengrass/ggc/core/
   sudo ./greengrassd stop
   ```

1. Remove the **TransferStream** Lambda function from the core.

   1. Follow [Step 6: Create a group version](#stream-manager-cli-create-group-version) to create a new group version. but remove the `--function-definition-version-arn` option in the `create-group-version` command. Or, create a function definition version that doesn't include the **TransferStream** Lambda function.
**Note**  
By omitting the system `GGStreamManager` Lambda function from the deployed group version, you disable stream management on the core.

   1. Follow [Step 7: Create a deployment](#stream-manager-cli-create-deployment) to deploy the new group version.

To view logging information or troubleshoot issues with streams, check the logs for the `TransferStream` and `GGStreamManager` functions. You must have `root` permissions to read AWS IoT Greengrass logs on the file system.
+ `TransferStream` writes log entries to `greengrass-root/ggc/var/log/user/region/account-id/TransferStream.log`.
+ `GGStreamManager` writes log entries to `greengrass-root/ggc/var/log/system/GGStreamManager.log`.

If you need more troubleshooting information, you can set the `Lambda` logging level to `DEBUG` and then create and deploy a new group version.

## See also
<a name="stream-manager-cli-see-also"></a>
+ [Manage data streams on the AWS IoT Greengrass core](stream-manager.md)
+ [Use StreamManagerClient to work with streams](work-with-streams.md)
+ [Export configurations for supported AWS Cloud destinations](stream-export-configurations.md)
+ [Configure AWS IoT Greengrass stream manager](configure-stream-manager.md)
+ [Export data streams to the AWS Cloud (console)](stream-manager-console.md)
+ <a name="see-also-iam-cli"></a>[AWS Identity and Access Management (IAM) commands](https://docs.aws.amazon.com/cli/latest/reference/iam) in the *AWS CLI Command Reference*
+ <a name="see-also-lambda-cli"></a>[AWS Lambda commands](https://docs.aws.amazon.com/cli/latest/reference/lambda) in the *AWS CLI Command Reference*
+ <a name="see-also-gg-cli"></a>[AWS IoT Greengrass commands](https://docs.aws.amazon.com/cli/latest/reference/greengrass/index.html) in the *AWS CLI Command Reference*