

# Manage data streams on Greengrass core devices
<a name="manage-data-streams"></a>

AWS IoT Greengrass stream manager makes it more efficient and reliable to transfer high-volume IoT data to the AWS Cloud. Stream manager processes data streams on the AWS IoT Greengrass Core before it exports them to the AWS Cloud. Stream manager integrates with common edge scenarios, such as machine learning (ML) inference, where the AWS IoT Greengrass Core device processes and analyzes data before it exports the data to the AWS Cloud or local storage destinations.

Stream manager provides a common interface to simplify custom component development so that you don't need to build custom stream management functionality. Your components can use a standardized mechanism to process high-volume streams and manage local data retention policies. You can define policies for storage type, size, and data retention for each stream to control how stream manager processes and exports data.

Stream manager works in environments with intermittent or limited connectivity. You can define bandwidth use, timeout behavior, and how the AWS IoT Greengrass Core handles stream data when it is connected or disconnected. You can also set priorities to control the order in which the AWS IoT Greengrass Core exports streams to the AWS Cloud. This makes it possible for you to handle critical data sooner than other data.

You can configure stream manager to automatically export data to the AWS Cloud for storage or further processing and analysis. Stream manager supports exports to the following AWS Cloud destinations:
+ Channels in AWS IoT Analytics. AWS IoT Analytics lets you perform advanced analysis on your data to help make business decisions and improve machine learning models. For more information, see [What is AWS IoT Analytics?](https://docs.aws.amazon.com/iotanalytics/latest/userguide/welcome.html) in the *AWS IoT Analytics User Guide*.
+ Streams in Amazon Kinesis Data Streams. You can use Kinesis Data Streams to aggregate high-volume data and load it into a data warehouse or MapReduce cluster. For more information, see [What is Amazon Kinesis Data Streams?](https://docs.aws.amazon.com/streams/latest/dev/what-is-this-service.html) in the *Amazon Kinesis Data Streams Developer Guide*.
+ Asset properties in AWS IoT SiteWise. AWS IoT SiteWise lets you collect, organize, and analyze data from industrial equipment at scale. For more information, see [What is AWS IoT SiteWise?](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/what-is-sitewise.html) in the *AWS IoT SiteWise User Guide*.
+ Objects in Amazon Simple Storage Service Amazon S3. You can use Amazon S3 to store and retrieve large amounts of data. For more information, see [What is Amazon S3?](https://docs.aws.amazon.com/AmazonS3/latest/dev/Welcome.html) in the *Amazon Simple Storage Service Developer Guide*.

## Stream management workflow
<a name="stream-manager-workflow"></a>

Your IoT applications interact with stream manager through the Stream Manager SDK.

In a simple workflow, a component on the AWS IoT Greengrass core consumes IoT data, such as time-series temperature and pressure metrics. The component might filter or compress the data, and then call the Stream Manager SDK to write the data to a stream in stream manager. Stream manager can export the stream to the AWS Cloud automatically based on the policies that you define for the stream. Components can also send data directly to local databases or storage repositories.

Your IoT applications can include multiple custom components that read or write to streams. These components can read and write to streams to filter, aggregate, and analyze data on the AWS IoT Greengrass core device. This makes it possible to respond quickly to local events and extract valuable information before the data transfers from the core to the AWS Cloud or local destinations.

To get started, deploy the stream manager component to your AWS IoT Greengrass core device. In the deployment, configure the stream manager component parameters to define settings that apply to all streams on the Greengrass core device. Use these parameters to control how stream manager stores, processes, and exports streams based on your business needs and environment constraints. 

After you configure stream manager, you can create and deploy your IoT applications. These are typically custom components that use `StreamManagerClient` in the Stream Manager SDK to create and interact with streams. When you create a stream, you can define per-stream policies, such as export destinations, priority, and persistence. 

## Requirements
<a name="stream-manager-requirements"></a>

The following requirements apply for using stream manager:
+ Stream manager requires a minimum of 70 MB RAM in addition to the AWS IoT Greengrass Core software. Your total memory requirement depends on your workload.
+ AWS IoT Greengrass components must use the Stream Manager SDK to interact with stream manager. The Stream Manager SDK is available in the following languages :<a name="stream-manager-sdk-download-list"></a>
  + [Stream Manager SDK for Java](https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-java/) (v1.1.0 or later)
  + [Stream Manager SDK for Node.js](https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-js/) (v1.1.0 or later)
  + [Stream Manager SDK for Python](https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-python/) (v1.1.0 or later)
+ AWS IoT Greengrass components must specify the stream manager component (`aws.greengrass.StreamManager`) as a dependency in their recipe to use stream manager.
**Note**  <a name="stream-manager-upgrade-note"></a>
If you use stream manager to export data to the cloud, you can't upgrade version 2.0.7 of the stream manager component to a version between v2.0.8 and v2.0.11. If you are deploying stream manager for the first time, we strongly recommend that you deploy the latest version of the stream manager component.
+ If you define AWS Cloud export destinations for a stream, you must create your export targets and grant access permissions in the [Greengrass device role](device-service-role.md). Depending on the destination, other requirements might also apply. For more information, see:<a name="export-destinations-links"></a>
  + [AWS IoT Analytics channels](stream-export-configurations.md#export-to-iot-analytics)
  + [Amazon Kinesis data streams](stream-export-configurations.md#export-to-kinesis)
  + [AWS IoT SiteWise asset properties](stream-export-configurations.md#export-to-iot-sitewise)
  + [Amazon S3 objects](stream-export-configurations.md#export-to-s3)

  You are responsible for maintaining these AWS Cloud resources.

## Data security
<a name="stream-manager-security"></a>

When you use stream manager, be aware of the following security considerations.

### Local data security
<a name="stream-manager-security-stream-data"></a>

AWS IoT Greengrass does not encrypt stream data at rest or in transit between local components on the core device.
+ **Data at rest**. Stream data is stored locally in a storage directory. For data security, AWS IoT Greengrass relies on file permissions and full-disk encryption, if enabled. You can use the optional [STREAM\$1MANAGER\$1STORE\$1ROOT\$1DIR](configure-stream-manager.md#STREAM_MANAGER_STORE_ROOT_DIR) parameter to specify the storage directory. If you change this parameter later to use a different storage directory, AWS IoT Greengrass does not delete the previous storage directory or its contents.
+ **Data in transit locally**. AWS IoT Greengrass does not encrypt stream data in local transit between data sources, AWS IoT Greengrass components, the Stream Manager SDK, and stream manager.
+ **Data in transit to the AWS Cloud**. Data streams exported by stream manager to the AWS Cloud use standard AWS service client encryption with Transport Layer Security (TLS).

### Client authentication
<a name="stream-manager-security-client-authentication"></a>

Stream manager clients use the Stream Manager SDK to communicate with stream manager. When client authentication is enabled, only Greengrass components can interact with streams in stream manager. When client authentication is disabled, any process running on the Greengrass core device can interact with streams in stream manager. You should disable authentication only if your business case requires it.

You use the [STREAM\$1MANAGER\$1AUTHENTICATE\$1CLIENT](configure-stream-manager.md#STREAM_MANAGER_AUTHENTICATE_CLIENT) parameter to set the client authentication mode. You can configure this parameter when you deploy the stream manager component to core devices.


****  

|   | Enabled | Disabled | 
| --- | --- | --- | 
| Parameter value | `true` (default and recommended) | `false` | 
| Allowed clients | Greengrass components on the core device | Greengrass components on the core device Other processes running on the Greengrass core device | 

## See also
<a name="stream-manager-see-also"></a>
+ [Configure AWS IoT Greengrass stream manager](configure-stream-manager.md)
+ [Use StreamManagerClient to work with streams](work-with-streams.md)
+ [Export configurations for supported AWS Cloud destinations](stream-export-configurations.md)

# Configure AWS IoT Greengrass stream manager
<a name="configure-stream-manager"></a>

On Greengrass core devices, stream manager can store, process, and export IoT device data. Stream manager provides parameters that you use to configure runtime settings. These settings apply to all streams on the Greengrass core device. You can use the AWS IoT Greengrass console or API to configure stream manager settings when you deploy the component. Changes take effect after the deployment completes.

## Stream manager parameters
<a name="stream-manager-parameters"></a>

Stream manager provides the following parameters that you can configure when you deploy the component to your core devices. All parameters are optional.

**Storage directory**  <a name="STREAM_MANAGER_STORE_ROOT_DIR"></a>
Parameter name: `STREAM_MANAGER_STORE_ROOT_DIR`  
The absolute path of the local folder used to store streams. This value must start with a forward slash (for example, `/data`).  
<a name="stream-manager-store-root-dir-parameter-folder-requirements"></a>You must specify an existing folder, and the [system user who runs the stream manager component](configure-greengrass-core-v2.md#configure-component-user) must have permissions to read and write to this folder. For example, you can run the following commands to create and configure a folder, `/var/greengrass/streams`, which you specify as the stream manager root folder. These commands allow the default system user, `ggc_user`, to read and write to this folder.  

```
sudo mkdir /var/greengrass/streams
sudo chown ggc_user /var/greengrass/streams
sudo chmod 700 /var/greengrass/streams
```
For information about securing stream data, see [Local data security](manage-data-streams.md#stream-manager-security-stream-data).  
Default: `/greengrass/v2/work/aws.greengrass.StreamManager`

**Server port**  
Parameter name: `STREAM_MANAGER_SERVER_PORT`  
The local port number used to communicate with stream manager. The default is `8088`.  
You can specify `0` to use a random available port.

**Authenticate client**  <a name="STREAM_MANAGER_AUTHENTICATE_CLIENT"></a>
Parameter name: `STREAM_MANAGER_AUTHENTICATE_CLIENT`  
Indicates whether clients must be authenticated to interact with stream manager. All interaction between clients and stream manager is controlled by the Stream Manager SDK. This parameter determines which clients can call the Stream Manager SDK to work with streams. For more information, see [Client authentication](manage-data-streams.md#stream-manager-security-client-authentication).  
Valid values are `true` or `false`. The default is `true` (recommended).  
+ `true`. Allows only Greengrass components as clients. Components use internal AWS IoT Greengrass Core protocols to authenticate with the Stream Manager SDK.
+ `false`. Allows any process that runs on the AWS IoT Greengrass Core to be a client. Do not set the value to `false` unless your business case requires it. For example, use `false` only if non-component processes on the core device must communicate directly with stream manager.

**Maximum bandwidth**  
Parameter name: `STREAM_MANAGER_EXPORTER_MAX_BANDWIDTH`  
The average maximum bandwidth (in kilobits per second) that can be used to export data. The default allows unlimited use of available bandwidth.

**Thread pool size**  
Parameter name: `STREAM_MANAGER_EXPORTER_THREAD_POOL_SIZE`  
The maximum number of active threads that can be used to export data. The default is `5`.  
The optimal size depends on your hardware, stream volume, and planned number of export streams. If your export speed is slow, you can adjust this setting to find the optimal size for your hardware and business case. The CPU and memory of your core device hardware are limiting factors. To start, you might try setting this value equal to the number of processor cores on the device.  
Be careful not to set a size that's higher than your hardware can support. Each stream consumes hardware resources, so try to limit the number of export streams on constrained devices.

**JVM arguments**  
Parameter name: `JVM_ARGS`  
Custom Java Virtual Machine arguments to pass to stream manager at startup. Multiple arguments should be separated by spaces.  
Use this parameter only when you must override the default settings used by the JVM. For example, you might need to increase the default heap size if you plan to export a large number of streams.

**Logging level**  
Parameter name: `LOG_LEVEL`  
The logging level for the component. Choose from the following log levels, listed here in level order:  
+ `TRACE`
+ `DEBUG`
+ `INFO`
+ `WARN`
+ `ERROR`
Default: `INFO`

**Minimum size for multipart upload**  <a name="stream-manager-minimum-part-size"></a>
Parameter name: `STREAM_MANAGER_EXPORTER_S3_DESTINATION_MULTIPART_UPLOAD_MIN_PART_SIZE_BYTES`  
The minimum size (in bytes) of a part in a multipart upload to Amazon S3. Stream manager uses this setting and the size of the input file to determine how to batch data in a multipart PUT request. The default and minimum value is `5242880` bytes (5 MB).  
Stream manager uses the stream's `sizeThresholdForMultipartUploadBytes` property to determine whether to export to Amazon S3 as a single or multipart upload. User-defined Greengrass components set this threshold when they create a stream that exports to Amazon S3. The default threshold is 5 MB.

## See also
<a name="configure-stream-manager-see-also"></a>
+ [Manage data streams on Greengrass core devices](manage-data-streams.md)
+ [Use StreamManagerClient to work with streams](work-with-streams.md)
+ [Export configurations for supported AWS Cloud destinations](stream-export-configurations.md)

# Create custom components that use stream manager
<a name="use-stream-manager-in-custom-components"></a>

Use stream manager in custom Greengrass components to store, process, and export IoT device data. Use the procedures and examples in this section to create component recipes, artifacts, and applications that work with stream manager. For more information about how to develop and test components, see [Create AWS IoT Greengrass components](create-components.md).

**Topics**
+ [Define component recipes that use stream manager](#stream-manager-recipes)
+ [Connect to stream manager in application code](#connect-to-stream-manager)

## Define component recipes that use stream manager
<a name="stream-manager-recipes"></a>

To use stream manager in a custom component, you must define the `aws.greengrass.StreamManager` component as a dependency. You must also provide the Stream Manager SDK. Complete the following tasks to download and use the Stream Manager SDK in the language of your choice.

### Use the Stream Manager SDK for Java
<a name="use-stream-manager-sdk-java"></a>

The Stream Manager SDK for Java is available as a JAR file that you can use to compile your component. Then, you can create an application JAR that includes the Stream Manager SDK, define the application JAR as a component artifact, and run the application JAR in the component lifecycle.

**To use the Stream Manager SDK for Java**

1. Download the [Stream Manager SDK for Java JAR file](https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-java/blob/main/sdk/aws-greengrass-stream-manager-sdk-java.jar).

1. Do one of the following to create component artifacts from your Java application and the Stream Manager SDK JAR file:
   + Build your application as a JAR file that includes the Stream Manager SDK JAR, and run this JAR file in your component recipe.
   + Define the Stream Manager SDK JAR as a component artifact. Add that artifact to the classpath when you run your application in your component recipe.

   Your component recipe might look like the following example. This component runs a modified version of the [StreamManagerS3.java](https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-java/blob/main/samples/StreamManagerS3/src/main/java/com/amazonaws/greengrass/examples/StreamManagerS3.java) example, where `StreamManagerS3.jar` includes the Stream Manager SDK JAR.

------
#### [ JSON ]

   ```
   {
     "RecipeFormatVersion": "2020-01-25",
     "ComponentName": "com.example.StreamManagerS3Java",
     "ComponentVersion": "1.0.0",
     "ComponentDescription": "Uses stream manager to upload a file to an S3 bucket.",
     "ComponentPublisher": "Amazon",
     "ComponentDependencies": {
       "aws.greengrass.StreamManager": {
         "VersionRequirement": "^2.0.0"
       }
     },
     "Manifests": [
       {
         "Lifecycle": {
           "Run": "java -jar {artifacts:path}/StreamManagerS3.jar"
         },
         "Artifacts": [
           {
             "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Java/1.0.0/StreamManagerS3.jar"
           }
         ]
       }
     ]
   }
   ```

------
#### [ YAML ]

   ```
   ---
   RecipeFormatVersion: '2020-01-25'
   ComponentName: com.example.StreamManagerS3Java
   ComponentVersion: 1.0.0
   ComponentDescription: Uses stream manager to upload a file to an S3 bucket.
   ComponentPublisher: Amazon
   ComponentDependencies:
     aws.greengrass.StreamManager:
       VersionRequirement: "^2.0.0"
   Manifests:
     - Lifecycle:
         Run: java -jar {artifacts:path}/StreamManagerS3.jar
       Artifacts:
         - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Java/1.0.0/StreamManagerS3.jar
   ```

------

   For more information about how to develop and test components, see [Create AWS IoT Greengrass components](create-components.md).

### Use the Stream Manager SDK for Python
<a name="use-stream-manager-sdk-python"></a>

The Stream Manager SDK for Python is available as source code that you can include in your component. Create a ZIP file of the Stream Manager SDK, define the ZIP file as a component artifact, and install the SDK's requirements in the component lifecycle.

**To use the Stream Manager SDK for Python**

1. Clone or download the [aws-greengrass-stream-manager-sdk-python](https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-python) repository.

   ```
   git clone git@github.com:aws-greengrass/aws-greengrass-stream-manager-sdk-python.git
   ```

1. Create a ZIP file that contains the `stream_manager` folder, which contains the source code of the Stream Manager SDK for Python. You can provide this ZIP file as a component artifact that the AWS IoT Greengrass Core software unzips when it installs your component. Do the following:

   1. Open the folder that contains the repository that you cloned or downloaded in the previous step.

      ```
      cd aws-greengrass-stream-manager-sdk-python
      ```

   1. Zip the `stream_manager` folder into a ZIP file named `stream_manager_sdk.zip`.

------
#### [ Linux or Unix ]

      ```
      zip -rv stream_manager_sdk.zip stream_manager
      ```

------
#### [ Windows Command Prompt (CMD) ]

      ```
      tar -acvf stream_manager_sdk.zip stream_manager
      ```

------
#### [ PowerShell ]

      ```
      Compress-Archive stream_manager stream_manager_sdk.zip
      ```

------

   1. Verify that the `stream_manager_sdk.zip` file contains the `stream_manager` folder and its contents. Run the following command to list the contents of the ZIP file.

------
#### [ Linux or Unix ]

      ```
      unzip -l stream_manager_sdk.zip
      ```

------
#### [ Windows Command Prompt (CMD) ]

      ```
      tar -tf stream_manager_sdk.zip
      ```

------

      The output should look similar to the following.

      ```
      Archive:  aws-greengrass-stream-manager-sdk-python/stream_manager.zip
        Length      Date    Time    Name
      ---------  ---------- -----   ----
              0  02-24-2021 20:45   stream_manager/
            913  02-24-2021 20:45   stream_manager/__init__.py
           9719  02-24-2021 20:45   stream_manager/utilinternal.py
           1412  02-24-2021 20:45   stream_manager/exceptions.py
           1004  02-24-2021 20:45   stream_manager/util.py
              0  02-24-2021 20:45   stream_manager/data/
         254463  02-24-2021 20:45   stream_manager/data/__init__.py
          26515  02-24-2021 20:45   stream_manager/streammanagerclient.py
      ---------                     -------
         294026                     8 files
      ```

1. Copy the Stream Manager SDK artifacts to your component's artifacts folder. In addition to the Stream Manager SDK ZIP file, your component uses the SDK's `requirements.txt` file to install the dependencies of the Stream Manager SDK. Replace *\$1/greengrass-components* with the path to the folder that you use for local development.

------
#### [ Linux or Unix ]

   ```
   cp {stream_manager_sdk.zip,requirements.txt} ~/greengrass-components/artifacts/com.example.StreamManagerS3Python/1.0.0/
   ```

------
#### [ Windows Command Prompt (CMD) ]

   ```
   robocopy . %USERPROFILE%\greengrass-components\artifacts\com.example.StreamManagerS3Python\1.0.0 stream_manager_sdk.zip
   robocopy . %USERPROFILE%\greengrass-components\artifacts\com.example.StreamManagerS3Python\1.0.0 requirements.txt
   ```

------
#### [ PowerShell ]

   ```
   cp .\stream_manager_sdk.zip,.\requirements.txt ~\greengrass-components\artifacts\com.example.StreamManagerS3Python\1.0.0\
   ```

------

1. Create your component recipe. In the recipe, do the following:

   1. Define `stream_manager_sdk.zip` and `requirements.txt` as artifacts.

   1. Define your Python application as an artifact.

   1. In the install lifecycle, install the Stream Manager SDK requirements from `requirements.txt`.

   1. In the run lifecycle, append the Stream Manager SDK to `PYTHONPATH`, and run your Python application.

   Your component recipe might look like the following example. This component runs the [stream\$1manager\$1s3.py](https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-python/blob/main/samples/stream_manager_s3.py) example.

------
#### [ JSON ]

   ```
   {
     "RecipeFormatVersion": "2020-01-25",
     "ComponentName": "com.example.StreamManagerS3Python",
     "ComponentVersion": "1.0.0",
     "ComponentDescription": "Uses stream manager to upload a file to an S3 bucket.",
     "ComponentPublisher": "Amazon",
     "ComponentDependencies": {
       "aws.greengrass.StreamManager": {
         "VersionRequirement": "^2.0.0"
       }
     },
     "Manifests": [
       {
         "Platform": {
           "os": "linux"
         },
         "Lifecycle": {
           "install": "pip3 install --user -r {artifacts:path}/requirements.txt",
           "Run": "export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk; python3 {artifacts:path}/stream_manager_s3.py"
         },
         "Artifacts": [
           {
             "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip",
             "Unarchive": "ZIP"
           },
           {
             "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py"
           },
           {
             "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt"
           }
         ]
       },
       {
         "Platform": {
           "os": "windows"
         },
         "Lifecycle": {
           "install": "pip3 install --user -r {artifacts:path}/requirements.txt",
           "Run": "set \"PYTHONPATH=%PYTHONPATH%;{artifacts:decompressedPath}/stream_manager_sdk\" & py -3 {artifacts:path}/stream_manager_s3.py"
         },
         "Artifacts": [
           {
             "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip",
             "Unarchive": "ZIP"
           },
           {
             "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py"
           },
           {
             "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt"
           }
         ]
       }
     ]
   }
   ```

------
#### [ YAML ]

   ```
   ---
   RecipeFormatVersion: '2020-01-25'
   ComponentName: com.example.StreamManagerS3Python
   ComponentVersion: 1.0.0
   ComponentDescription: Uses stream manager to upload a file to an S3 bucket.
   ComponentPublisher: Amazon
   ComponentDependencies:
     aws.greengrass.StreamManager:
       VersionRequirement: "^2.0.0"
   Manifests:
     - Platform:
         os: linux
       Lifecycle:
         install: pip3 install --user -r {artifacts:path}/requirements.txt
         Run: |
           export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk
           python3 {artifacts:path}/stream_manager_s3.py
       Artifacts:
         - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip
           Unarchive: ZIP
         - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py
         - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt
     - Platform:
         os: windows
       Lifecycle:
         install: pip3 install --user -r {artifacts:path}/requirements.txt
         Run: |
           set "PYTHONPATH=%PYTHONPATH%;{artifacts:decompressedPath}/stream_manager_sdk"
           py -3 {artifacts:path}/stream_manager_s3.py
       Artifacts:
         - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip
           Unarchive: ZIP
         - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py
         - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt
   ```

------

   For more information about how to develop and test components, see [Create AWS IoT Greengrass components](create-components.md).

### Use the Stream Manager SDK for JavaScript
<a name="use-stream-manager-sdk-javascript"></a>

The Stream Manager SDK for JavaScript is available as source code that you can include in your component. Create a ZIP file of the Stream Manager SDK, define the ZIP file as a component artifact, and install the SDK in the component lifecycle.

**To use the Stream Manager SDK for JavaScript**

1. Clone or download the [aws-greengrass-stream-manager-sdk-js](https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-js) repository.

   ```
   git clone git@github.com:aws-greengrass/aws-greengrass-stream-manager-sdk-js.git
   ```

1. Create a ZIP file that contains the `aws-greengrass-stream-manager-sdk` folder, which contains the source code of the Stream Manager SDK for JavaScript. You can provide this ZIP file as a component artifact that the AWS IoT Greengrass Core software unzips when it installs your component. Do the following:

   1. Open the folder that contains the repository that you cloned or downloaded in the previous step.

      ```
      cd aws-greengrass-stream-manager-sdk-js
      ```

   1. Zip the `aws-greengrass-stream-manager-sdk` folder into a ZIP file named `stream-manager-sdk.zip`.

------
#### [ Linux or Unix ]

      ```
      zip -rv stream-manager-sdk.zip aws-greengrass-stream-manager-sdk
      ```

------
#### [ Windows Command Prompt (CMD) ]

      ```
      tar -acvf stream-manager-sdk.zip aws-greengrass-stream-manager-sdk
      ```

------
#### [ PowerShell ]

      ```
      Compress-Archive aws-greengrass-stream-manager-sdk stream-manager-sdk.zip
      ```

------

   1. Verify that the `stream-manager-sdk.zip` file contains the `aws-greengrass-stream-manager-sdk` folder and its contents. Run the following command to list the contents of the ZIP file.

------
#### [ Linux or Unix ]

      ```
      unzip -l stream-manager-sdk.zip
      ```

------
#### [ Windows Command Prompt (CMD) ]

      ```
      tar -tf stream-manager-sdk.zip
      ```

------

      The output should look similar to the following.

      ```
      Archive:  stream-manager-sdk.zip
        Length      Date    Time    Name
      ---------  ---------- -----   ----
              0  02-24-2021 22:36   aws-greengrass-stream-manager-sdk/
            369  02-24-2021 22:36   aws-greengrass-stream-manager-sdk/package.json
           1017  02-24-2021 22:36   aws-greengrass-stream-manager-sdk/util.js
           8374  02-24-2021 22:36   aws-greengrass-stream-manager-sdk/utilInternal.js
           1937  02-24-2021 22:36   aws-greengrass-stream-manager-sdk/exceptions.js
              0  02-24-2021 22:36   aws-greengrass-stream-manager-sdk/data/
         353343  02-24-2021 22:36   aws-greengrass-stream-manager-sdk/data/index.js
          22599  02-24-2021 22:36   aws-greengrass-stream-manager-sdk/client.js
            216  02-24-2021 22:36   aws-greengrass-stream-manager-sdk/index.js
      ---------                     -------
         387855                     9 files
      ```

1. Copy the Stream Manager SDK artifact to your component's artifacts folder. Replace *\$1/greengrass-components* with the path to the folder that you use for local development.

------
#### [ Linux or Unix ]

   ```
   cp stream-manager-sdk.zip ~/greengrass-components/artifacts/com.example.StreamManagerS3JS/1.0.0/
   ```

------
#### [ Windows Command Prompt (CMD) ]

   ```
   robocopy . %USERPROFILE%\greengrass-components\artifacts\com.example.StreamManagerS3JS\1.0.0 stream-manager-sdk.zip
   ```

------
#### [ PowerShell ]

   ```
   cp .\stream-manager-sdk.zip ~\greengrass-components\artifacts\com.example.StreamManagerS3JS\1.0.0\
   ```

------

1. Create your component recipe. In the recipe, do the following:

   1. Define `stream-manager-sdk.zip` as an artifact.

   1. Define your JavaScript application as an artifact.

   1. In the install lifecycle, install the Stream Manager SDK from the `stream-manager-sdk.zip` artifact. This `npm install` command creates a `node_modules` folder that contains the Stream Manager SDK and its dependencies.

   1. In the run lifecycle, append the `node_modules` folder to `NODE_PATH`, and run your JavaScript application.

   Your component recipe might look like the following example. This component runs the [StreamManagerS3](https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-js/blob/main/samples/StreamManagerS3/index.js) example.

------
#### [ JSON ]

   ```
   {
     "RecipeFormatVersion": "2020-01-25",
     "ComponentName": "com.example.StreamManagerS3JS",
     "ComponentVersion": "1.0.0",
     "ComponentDescription": "Uses stream manager to upload a file to an S3 bucket.",
     "ComponentPublisher": "Amazon",
     "ComponentDependencies": {
       "aws.greengrass.StreamManager": {
         "VersionRequirement": "^2.0.0"
       }
     },
     "Manifests": [
       {
         "Platform": {
           "os": "linux"
         },
         "Lifecycle": {
           "install": "npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk",
           "Run": "export NODE_PATH=$NODE_PATH:{work:path}/node_modules; node {artifacts:path}/index.js"
         },
         "Artifacts": [
           {
             "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip",
             "Unarchive": "ZIP"
           },
           {
             "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js"
           }
         ]
       },
       {
         "Platform": {
           "os": "windows"
         },
         "Lifecycle": {
           "install": "npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk",
           "Run": "set \"NODE_PATH=%NODE_PATH%;{work:path}/node_modules\" & node {artifacts:path}/index.js"
         },
         "Artifacts": [
           {
             "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip",
             "Unarchive": "ZIP"
           },
           {
             "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js"
           }
         ]
       }
     ]
   }
   ```

------
#### [ YAML ]

   ```
   ---
   RecipeFormatVersion: '2020-01-25'
   ComponentName: com.example.StreamManagerS3JS
   ComponentVersion: 1.0.0
   ComponentDescription: Uses stream manager to upload a file to an S3 bucket.
   ComponentPublisher: Amazon
   ComponentDependencies:
     aws.greengrass.StreamManager:
       VersionRequirement: "^2.0.0"
   Manifests:
     - Platform:
         os: linux
       Lifecycle:
         install: npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk
         Run: |
           export NODE_PATH=$NODE_PATH:{work:path}/node_modules
           node {artifacts:path}/index.js
       Artifacts:
         - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip
           Unarchive: ZIP
         - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js
     - Platform:
         os: windows
       Lifecycle:
         install: npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk
         Run: |
           set "NODE_PATH=%NODE_PATH%;{work:path}/node_modules"
           node {artifacts:path}/index.js
       Artifacts:
         - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip
           Unarchive: ZIP
         - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js
   ```

------

   For more information about how to develop and test components, see [Create AWS IoT Greengrass components](create-components.md).

## Connect to stream manager in application code
<a name="connect-to-stream-manager"></a>

To connect to stream manager in your application, create an instance of `StreamManagerClient` from the Stream Manager SDK. This client connects to the stream manager component on its default port 8088, or the port that you specify. For more information about how to use `StreamManagerClient` after you create an instance, see [Use StreamManagerClient to work with streams](work-with-streams.md).

**Example: Connect to stream manager with default port**  

```
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient;

public class MyStreamManagerComponent {

    void connectToStreamManagerWithDefaultPort() {
        StreamManagerClient client = StreamManagerClientFactory.standard().build();
        
        // Use the client.
    }
}
```

```
from stream_manager import (
    StreamManagerClient
)
              
def connect_to_stream_manager_with_default_port():
    client = StreamManagerClient()
    
    # Use the client.
```

```
const {
    StreamManagerClient
} = require('aws-greengrass-stream-manager-sdk');

function connectToStreamManagerWithDefaultPort() {
    const client = new StreamManagerClient();
    
    // Use the client.
}
```

**Example: Connect to stream manager with non-default port**  
If you configure stream manager with a port other than the default, you must use [interprocess communication](interprocess-communication.md) to retrieve the port from the component configuration.  
The `port` configuration parameter contains the value that you specify in `STREAM_MANAGER_SERVER_PORT` when you deploy stream manager.

```
void connectToStreamManagerWithCustomPort() {
    EventStreamRPCConnection eventStreamRpcConnection = IPCUtils.getEventStreamRpcConnection();
    GreengrassCoreIPCClient greengrassCoreIPCClient = new GreengrassCoreIPCClient(eventStreamRpcConnection);
    List<String> keyPath = new ArrayList<>();
    keyPath.add("port");

    GetConfigurationRequest request = new GetConfigurationRequest();
    request.setComponentName("aws.greengrass.StreamManager");
    request.setKeyPath(keyPath);
    GetConfigurationResponse response =
            greengrassCoreIPCClient.getConfiguration(request, Optional.empty()).getResponse().get();
    String port = response.getValue().get("port").toString();
    System.out.print("Stream Manager is running on port: " + port);

    final StreamManagerClientConfig config = StreamManagerClientConfig.builder()
            .serverInfo(StreamManagerServerInfo.builder().port(Integer.parseInt(port)).build()).build();

    StreamManagerClient client = StreamManagerClientFactory.standard().withClientConfig(config).build();
    
    // Use the client.
}
```

```
import awsiot.greengrasscoreipc
from awsiot.greengrasscoreipc.model import (
    GetConfigurationRequest
)
from stream_manager import (
    StreamManagerClient
)

TIMEOUT = 10

def connect_to_stream_manager_with_custom_port():
    # Use IPC to get the port from the stream manager component configuration.
    ipc_client = awsiot.greengrasscoreipc.connect()
    request = GetConfigurationRequest()
    request.component_name = "aws.greengrass.StreamManager"
    request.key_path = ["port"]
    operation = ipc_client.new_get_configuration()
    operation.activate(request)
    future_response = operation.get_response()
    response = future_response.result(TIMEOUT)
    stream_manager_port = str(response.value["port"])
    
    # Use port to create a stream manager client.
    stream_client = StreamManagerClient(port=stream_manager_port)
    
    # Use the client.
```

# Use StreamManagerClient to work with streams
<a name="work-with-streams"></a>

User-defined Greengrass components that run on the Greengrass core device can use the `StreamManagerClient` object in the Stream Manager SDK to create streams in [stream manager](manage-data-streams.md) and then interact with the streams. When a component creates a stream, it defines the AWS Cloud destinations, prioritization, and other export and data retention policies for the stream. To send data to stream manager, components append the data to the stream. If an export destination is defined for the stream, stream manager exports the stream automatically.

**Note**  
<a name="stream-manager-clients"></a>Typically, clients of stream manager are user-defined Greengrass components. If your business case requires it, you can also allow non-component processes running on the Greengrass core (for example, a Docker container) to interact with stream manager. For more information, see [Client authentication](manage-data-streams.md#stream-manager-security-client-authentication).

The snippets in this topic show you how clients call `StreamManagerClient` methods to work with streams. For implementation details about the methods and their arguments, use the links to the SDK reference listed after each snippet. 

If you use stream manager in a Lambda function, your Lambda function should instantiate `StreamManagerClient` outside of the function handler. If instantiated in the handler, the function creates a `client` and connection to stream manager every time that it's invoked.

**Note**  
If you do instantiate `StreamManagerClient` in the handler, you must explicitly call the `close()` method when the `client` completes its work. Otherwise, the `client` keeps the connection open and another thread running until the script exits.

`StreamManagerClient` supports the following operations:
+ [Create message stream](#streammanagerclient-create-message-stream)
+ [Append message](#streammanagerclient-append-message)
+ [Read messages](#streammanagerclient-read-messages)
+ [List streams](#streammanagerclient-list-streams)
+ [Describe message stream](#streammanagerclient-describe-message-stream)
+ [Update message stream](#streammanagerclient-update-message-stream)
+ [Delete message stream](#streammanagerclient-delete-message-stream)

## Create message stream
<a name="streammanagerclient-create-message-stream"></a>

To create a stream, a user-defined Greengrass component calls the create method and passes in a `MessageStreamDefinition` object. This object specifies the unique name for the stream and defines how stream manager should handle new data when the maximum stream size is reached. You can use `MessageStreamDefinition` and its data types (such as `ExportDefinition`, `StrategyOnFull`, and `Persistence`) to define other stream properties. These include:
+ The target AWS IoT Analytics, Kinesis Data Streams, AWS IoT SiteWise, and Amazon S3 destinations for automatic exports. For more information, see [Export configurations for supported AWS Cloud destinations](stream-export-configurations.md).
+ Export priority. Stream manager exports higher priority streams before lower priority streams.
+ Maximum batch size and batch interval for AWS IoT Analytics, Kinesis Data Streams, and AWS IoT SiteWise destinations. Stream manager exports messages when either condition is met.
+ Time-to-live (TTL). The amount of time to guarantee that the stream data is available for processing. You should make sure that the data can be consumed within this time period. This is not a deletion policy. The data might not be deleted immediately after TTL period.
+ Stream persistence. Choose to save streams to the file system to persist data across core restarts or save streams in memory.
+ Starting sequence number. Specify the sequence number of the message to use as the starting message in the export.

For more information about `MessageStreamDefinition`, see the SDK reference for your target language:
+ [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html) in the Java SDK
+ [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html) in the Node.js SDK
+ [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.MessageStreamDefinition) in the Python SDK

**Note**  
<a name="streammanagerclient-http-config"></a>`StreamManagerClient` also provides a target destination you can use to export streams to an HTTP server. This target is intended for testing purposes only. It is not stable or supported for use in production environments.

After a stream is created, your Greengrass components can [append messages](#streammanagerclient-append-message) to the stream to send data for export and [read messages](#streammanagerclient-read-messages) from the stream for local processing. The number of streams that you create depends on your hardware capabilities and business case. One strategy is to create a stream for each target channel in AWS IoT Analytics or Kinesis data stream, though you can define multiple targets for a stream. A stream has a durable lifespan.

### Requirements
<a name="streammanagerclient-create-message-stream-reqs"></a>

This operation has the following requirements:
+ <a name="streammanagerclient-min-sm-sdk"></a>Minimum Stream Manager SDK version: Python: 1.1.0  \$1  Java: 1.1.0  \$1  Node.js: 1.1.0

### Examples
<a name="streammanagerclient-create-message-stream-examples"></a>

The following snippet creates a stream named `StreamName`. It defines stream properties in the `MessageStreamDefinition` and subordinate data types.

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    client.create_message_stream(MessageStreamDefinition(
        name="StreamName",    # Required.
        max_size=268435456,    # Default is 256 MB.
        stream_segment_size=16777216,    # Default is 16 MB.
        time_to_live_millis=None,    # By default, no TTL is enabled.
        strategy_on_full=StrategyOnFull.OverwriteOldestData,    # Required.
        persistence=Persistence.File,    # Default is File.
        flush_on_write=False,    # Default is false.
        export_definition=ExportDefinition(    # Optional. Choose where/how the stream is exported to the AWS Cloud.
            kinesis=None,
            iot_analytics=None,
            iot_sitewise=None,
            s3_task_executor=None
        )
    ))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [create\$1message\$1stream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.create_message_stream) \$1 [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.MessageStreamDefinition)

------
#### [ Java ]

```
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
    client.createMessageStream(
            new MessageStreamDefinition()
                    .withName("StreamName") // Required.
                    .withMaxSize(268435456L)    // Default is 256 MB.
                    .withStreamSegmentSize(16777216L)    // Default is 16 MB.
                    .withTimeToLiveMillis(null)    // By default, no TTL is enabled.
                    .withStrategyOnFull(StrategyOnFull.OverwriteOldestData)    // Required.
                    .withPersistence(Persistence.File)    // Default is File.
                    .withFlushOnWrite(false)    // Default is false.
                    .withExportDefinition(    // Optional. Choose where/how the stream is exported to the AWS Cloud.
                            new ExportDefinition()
                                    .withKinesis(null)
                                    .withIotAnalytics(null)
                                    .withIotSitewise(null)
                                    .withS3(null)
                    )
 
    );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [createMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#createMessageStream-com.amazonaws.greengrass.streammanager.model.MessageStreamDefinition-) \$1 [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
  try {
    await client.createMessageStream(
      new MessageStreamDefinition()
        .withName("StreamName") // Required.
        .withMaxSize(268435456)  // Default is 256 MB.
        .withStreamSegmentSize(16777216)  // Default is 16 MB.
        .withTimeToLiveMillis(null)  // By default, no TTL is enabled.
        .withStrategyOnFull(StrategyOnFull.OverwriteOldestData)  // Required.
        .withPersistence(Persistence.File)  // Default is File.
        .withFlushOnWrite(false)  // Default is false.
        .withExportDefinition(  // Optional. Choose where/how the stream is exported to the AWS Cloud.
          new ExportDefinition()
            .withKinesis(null)
            .withIotAnalytics(null)
            .withIotSiteWise(null)
            .withS3(null)
        )
    );
  } catch (e) {
    // Properly handle errors.
  }
});
client.onError((err) => {
  // Properly handle connection errors.
  // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [createMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#createMessageStream) \$1 [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html)

------

For more information about configuring export destinations, see [Export configurations for supported AWS Cloud destinations](stream-export-configurations.md).

## Append message
<a name="streammanagerclient-append-message"></a>

To send data to stream manager for export, your Greengrass components append the data to the target stream. The export destination determines the data type to pass to this method.

### Requirements
<a name="streammanagerclient-append-message-reqs"></a>

This operation has the following requirements:
+ <a name="streammanagerclient-min-sm-sdk"></a>Minimum Stream Manager SDK version: Python: 1.1.0  \$1  Java: 1.1.0  \$1  Node.js: 1.1.0

### Examples
<a name="streammanagerclient-append-message-examples"></a>

#### AWS IoT Analytics or Kinesis Data Streams export destinations
<a name="streammanagerclient-append-message-blob"></a>

The following snippet appends a message to the stream named `StreamName`. For AWS IoT Analytics or Kinesis Data Streams destinations, your Greengrass components append a blob of data.

This snippet has the following requirements:
+ <a name="streammanagerclient-min-sm-sdk"></a>Minimum Stream Manager SDK version: Python: 1.1.0  \$1  Java: 1.1.0  \$1  Node.js: 1.1.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data')
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [append\$1message](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.append_message)

------
#### [ Java ]

```
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
    long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes());
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [appendMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array"));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [appendMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage)

------

#### AWS IoT SiteWise export destinations
<a name="streammanagerclient-append-message-sitewise"></a>

The following snippet appends a message to the stream named `StreamName`. For AWS IoT SiteWise destinations, your Greengrass components append a serialized `PutAssetPropertyValueEntry` object. For more information, see [Exporting to AWS IoT SiteWise](stream-export-configurations.md#export-streams-to-sitewise).

**Note**  
<a name="BatchPutAssetPropertyValue-data-reqs"></a>When you send data to AWS IoT SiteWise, your data must meet the requirements of the `BatchPutAssetPropertyValue` action. For more information, see [BatchPutAssetPropertyValue](https://docs.aws.amazon.com/iot-sitewise/latest/APIReference/API_BatchPutAssetPropertyValue.html) in the *AWS IoT SiteWise API Reference*.

This snippet has the following requirements:
+ <a name="streammanagerclient-min-sm-sdk"></a>Minimum Stream Manager SDK version: Python: 1.1.0  \$1  Java: 1.1.0  \$1  Node.js: 1.1.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    # SiteWise requires unique timestamps in all messages and also needs timestamps not earlier
    # than 10 minutes in the past. Add some randomness to time and offset.

    # Note: To create a new asset property data, you should use the classes defined in the
    # greengrasssdk.stream_manager module.

    time_in_nanos = TimeInNanos(
        time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
    )
    variant = Variant(double_value=random.random())
    asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
    putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset)
    sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [append\$1message](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.append_message) \$1 [PutAssetPropertyValueEntry](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.PutAssetPropertyValueEntry)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    Random rand = new Random();
    // Note: To create a new asset property data, you should use the classes defined in the
    // com.amazonaws.greengrass.streammanager.model.sitewise package.
    List<AssetPropertyValue> entries = new ArrayList<>() ;

    // IoTSiteWise requires unique timestamps in all messages and also needs timestamps not earlier
    // than 10 minutes in the past. Add some randomness to time and offset.
    final int maxTimeRandomness = 60;
    final int maxOffsetRandomness = 10000;
    double randomValue = rand.nextDouble();
    TimeInNanos timestamp = new TimeInNanos()
            .withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness))
            .withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness)));
    AssetPropertyValue entry = new AssetPropertyValue()
            .withValue(new Variant().withDoubleValue(randomValue))
            .withQuality(Quality.GOOD)
            .withTimestamp(timestamp);
    entries.add(entry);

    PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
            .withEntryId(UUID.randomUUID().toString())
            .withPropertyAlias("PropertyAlias")
            .withPropertyValues(entries);
    long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [appendMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-) \$1 [PutAssetPropertyValueEntry](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/sitewise/PutAssetPropertyValueEntry.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const maxTimeRandomness = 60;
        const maxOffsetRandomness = 10000;
        const randomValue = Math.random();
        // Note: To create a new asset property data, you should use the classes defined in the
        // aws-greengrass-core-sdk StreamManager module.
        const timestamp = new TimeInNanos()
            .withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness))
            .withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness));
        const entry = new AssetPropertyValue()
            .withValue(new Variant().withDoubleValue(randomValue))
            .withQuality(Quality.GOOD)
            .withTimestamp(timestamp);

        const putAssetPropertyValueEntry =    new PutAssetPropertyValueEntry()
            .withEntryId(`${ENTRY_ID_PREFIX}${i}`)
            .withPropertyAlias("PropertyAlias")
            .withPropertyValues([entry]);
        const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [appendMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage) \$1 [PutAssetPropertyValueEntry](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.PutAssetPropertyValueEntry.html)

------

#### Amazon S3 export destinations
<a name="streammanagerclient-append-message-export-task"></a>

The following snippet appends an export task to the stream named `StreamName`. For Amazon S3 destinations, your Greengrass components append a serialized `S3ExportTaskDefinition` object that contains information about the source input file and target Amazon S3 object. If the specified object doesn't exist, Stream Manager creates it for you. For more information, see [Exporting to Amazon S3](stream-export-configurations.md#export-streams-to-s3).

This snippet has the following requirements:
+ <a name="streammanagerclient-min-sm-sdk"></a>Minimum Stream Manager SDK version: Python: 1.1.0  \$1  Java: 1.1.0  \$1  Node.js: 1.1.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    # Append an Amazon S3 Task definition and print the sequence number.
    s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName")
    sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [append\$1message](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.append_message) \$1 [S3ExportTaskDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.S3ExportTaskDefinition)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    // Append an Amazon S3 export task definition and print the sequence number.
    S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition()
        .withBucket("BucketName")
        .withKey("KeyName")
        .withInputUrl("URLToFile");
    long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition));
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [appendMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-) \$1 [S3ExportTaskDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/S3ExportTaskDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
     // Append an Amazon S3 export task definition and print the sequence number.
     const taskDefinition = new S3ExportTaskDefinition()
        .withBucket("BucketName")
        .withKey("KeyName")
        .withInputUrl("URLToFile");
        const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition)));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [appendMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage) \$1 [S3ExportTaskDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.S3ExportTaskDefinition.html)

------

## Read messages
<a name="streammanagerclient-read-messages"></a>

Read messages from a stream.

### Requirements
<a name="streammanagerclient-read-messages-reqs"></a>

This operation has the following requirements:
+ <a name="streammanagerclient-min-sm-sdk"></a>Minimum Stream Manager SDK version: Python: 1.1.0  \$1  Java: 1.1.0  \$1  Node.js: 1.1.0

### Examples
<a name="streammanagerclient-read-messages-examples"></a>

The following snippet reads messages from the stream named `StreamName`. The read method takes an optional `ReadMessagesOptions` object that specifies the sequence number to start reading from, the minimum and maximum numbers to read, and a timeout for reading messages.

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    message_list = client.read_messages(
        stream_name="StreamName",
        # By default, if no options are specified, it tries to read one message from the beginning of the stream.
        options=ReadMessagesOptions(
            desired_start_sequence_number=100,
            # Try to read from sequence number 100 or greater. By default, this is 0.
            min_message_count=10,
            # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
            max_message_count=100,    # Accept up to 100 messages. By default this is 1.
            read_timeout_millis=5000
            # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
        )
    )
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [read\$1messages](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.read_messages) \$1 [ReadMessagesOptions](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.ReadMessagesOptions)

------
#### [ Java ]

```
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
    List<Message> messages = client.readMessages("StreamName",
            // By default, if no options are specified, it tries to read one message from the beginning of the stream.
            new ReadMessagesOptions()
                    // Try to read from sequence number 100 or greater. By default this is 0.
                    .withDesiredStartSequenceNumber(100L)
                    // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
                    .withMinMessageCount(10L)
                    // Accept up to 100 messages. By default this is 1.
                    .withMaxMessageCount(100L)
                    // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
                    .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis())
    );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [readMessages](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#readMessages-java.lang.String-com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions-) \$1 [ReadMessagesOptions](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/ReadMessagesOptions.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const messages = await client.readMessages("StreamName",
            // By default, if no options are specified, it tries to read one message from the beginning of the stream.
            new ReadMessagesOptions()
                // Try to read from sequence number 100 or greater. By default this is 0.
                .withDesiredStartSequenceNumber(100)
                // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1.
                .withMinMessageCount(10)
                // Accept up to 100 messages. By default this is 1.
                .withMaxMessageCount(100)
                // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
                .withReadTimeoutMillis(5 * 1000)
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [readMessages](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#readMessages) \$1 [ReadMessagesOptions](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.ReadMessagesOptions.html)

------

## List streams
<a name="streammanagerclient-list-streams"></a>

Get the list of streams in stream manager.

### Requirements
<a name="streammanagerclient-list-streams-reqs"></a>

This operation has the following requirements:
+ <a name="streammanagerclient-min-sm-sdk"></a>Minimum Stream Manager SDK version: Python: 1.1.0  \$1  Java: 1.1.0  \$1  Node.js: 1.1.0

### Examples
<a name="streammanagerclient-list-streams-examples"></a>

The following snippet gets a list of the streams (by name) in stream manager.

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    stream_names = client.list_streams()
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [list\$1streams](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.list_streams)

------
#### [ Java ]

```
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
    List<String> streamNames = client.listStreams();
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [listStreams](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#listStreams--)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const streams = await client.listStreams();
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [listStreams](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#listStreams)

------

## Describe message stream
<a name="streammanagerclient-describe-message-stream"></a>

Get metadata about a stream, including the stream definition, size, and export status.

### Requirements
<a name="streammanagerclient-describe-message-stream-reqs"></a>

This operation has the following requirements:
+ <a name="streammanagerclient-min-sm-sdk"></a>Minimum Stream Manager SDK version: Python: 1.1.0  \$1  Java: 1.1.0  \$1  Node.js: 1.1.0

### Examples
<a name="streammanagerclient-describe-message-stream-examples"></a>

The following snippet gets metadata about the stream named `StreamName`, including the stream's definition, size, and exporter statuses.

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    stream_description = client.describe_message_stream(stream_name="StreamName")
    if stream_description.export_statuses[0].error_message:
        # The last export of export destination 0 failed with some error
        # Here is the last sequence number that was successfully exported
        stream_description.export_statuses[0].last_exported_sequence_number
 
    if (stream_description.storage_status.newest_sequence_number >
            stream_description.export_statuses[0].last_exported_sequence_number):
        pass
        # The end of the stream is ahead of the last exported sequence number
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [describe\$1message\$1stream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.describe_message_stream)

------
#### [ Java ]

```
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
    MessageStreamInfo description = client.describeMessageStream("StreamName");
    String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage();
    if (lastErrorMessage != null && !lastErrorMessage.equals("")) {
        // The last export of export destination 0 failed with some error.
        // Here is the last sequence number that was successfully exported.
        description.getExportStatuses().get(0).getLastExportedSequenceNumber();
    }
 
    if (description.getStorageStatus().getNewestSequenceNumber() >
            description.getExportStatuses().get(0).getLastExportedSequenceNumber()) {
        // The end of the stream is ahead of the last exported sequence number.
    }
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [describeMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#describeMessageStream-java.lang.String-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const description = await client.describeMessageStream("StreamName");
        const lastErrorMessage = description.exportStatuses[0].errorMessage;
        if (lastErrorMessage) {
            // The last export of export destination 0 failed with some error.
            // Here is the last sequence number that was successfully exported.
            description.exportStatuses[0].lastExportedSequenceNumber;
        }
 
        if (description.storageStatus.newestSequenceNumber >
            description.exportStatuses[0].lastExportedSequenceNumber) {
            // The end of the stream is ahead of the last exported sequence number.
        }
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [describeMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#describeMessageStream)

------

## Update message stream
<a name="streammanagerclient-update-message-stream"></a>

Update properties of an existing stream. You might want to update a stream if your requirements change after the stream was created. For example:
+ Add a new [export configuration](stream-export-configurations.md) for an AWS Cloud destination.
+ Increase the maximum size of a stream to change how data is exported or retained. For example, the stream size in combination with your strategy on full settings might result in data being deleted or rejected before stream manager can process it.
+ Pause and resume exports; for example, if export tasks are long running and you want to ration your upload data.

Your Greengrass components follow this high-level process to update a stream:

1. [Get the description of the stream.](#streammanagerclient-describe-message-stream)

1. Update the target properties on the corresponding `MessageStreamDefinition` and subordinate objects.

1. Pass in the updated `MessageStreamDefinition`. Make sure to include the complete object definitions for the updated stream. Undefined properties revert to the default values.

   You can specify the sequence number of the message to use as the starting message in the export.

### Requirements
<a name="streammanagerclient-update-message-streamreqs"></a>

This operation has the following requirements:
+ <a name="streammanagerclient-min-sm-sdk"></a>Minimum Stream Manager SDK version: Python: 1.1.0  \$1  Java: 1.1.0  \$1  Node.js: 1.1.0

### Examples
<a name="streammanagerclient-update-message-stream-examples"></a>

The following snippet updates the stream named `StreamName`. It updates multiple properties of a stream that exports to Kinesis Data Streams.

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    message_stream_info = client.describe_message_stream(STREAM_NAME)
    message_stream_info.definition.max_size=536870912
    message_stream_info.definition.stream_segment_size=33554432
    message_stream_info.definition.time_to_live_millis=3600000
    message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData
    message_stream_info.definition.persistence=Persistence.Memory
    message_stream_info.definition.flush_on_write=False
    message_stream_info.definition.export_definition.kinesis=
        [KinesisConfig(    
            # Updating Export definition to add a Kinesis Stream configuration.
            identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))]
    client.update_message_stream(message_stream_info.definition)
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [updateMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.update_message_stream) \$1 [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.MessageStreamDefinition)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME);
    // Update the message stream with new values.
    client.updateMessageStream(
        messageStreamInfo.getDefinition()
            .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
            // Max Size update should be greater than initial Max Size defined in Create Message Stream request
            .withMaxSize(536870912L) // Update Max Size to 512 MB.
            .withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB.
            .withFlushOnWrite(true) // Update flush on write to true.
            .withPersistence(Persistence.Memory) // Update the persistence to Memory.
            .withTimeToLiveMillis(3600000L)    // Update TTL to 1 hour.
            .withExportDefinition(
                // Optional. Choose where/how the stream is exported to the AWS Cloud.
                messageStreamInfo.getDefinition().getExportDefinition().
                    // Updating Export definition to add a Kinesis Stream configuration.
                    .withKinesis(new ArrayList<KinesisConfig>() {{
                        add(new KinesisConfig()
                            .withIdentifier(EXPORT_IDENTIFIER)
                            .withKinesisStreamName("test"));
                        }})
            );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [update\$1message\$1stream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#updateMessageStream-java.lang.String-) \$1 [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const messageStreamInfo = await c.describeMessageStream(STREAM_NAME);
        await client.updateMessageStream(
            messageStreamInfo.definition
                // Max Size update should be greater than initial Max Size defined in Create Message Stream request
                .withMaxSize(536870912)    // Default is 256 MB. Updating Max Size to 512 MB.
                .withStreamSegmentSize(33554432)    // Default is 16 MB. Updating Segment Size to 32 MB.
                .withTimeToLiveMillis(3600000)    // By default, no TTL is enabled. Update TTL to 1 hour.
                .withStrategyOnFull(StrategyOnFull.RejectNewData)    // Required. Updating Strategy on full to reject new data.
                .withPersistence(Persistence.Memory)    // Default is File. Update the persistence to Memory
                .withFlushOnWrite(true)    // Default is false. Updating to true.
                .withExportDefinition(    
                    // Optional. Choose where/how the stream is exported to the AWS Cloud.
                    messageStreamInfo.definition.exportDefinition
                        // Updating Export definition to add a Kinesis Stream configuration.
                        .withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())])
                )
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [updateMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#updateMessageStream) \$1 [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html)

------

### Constraints for updating streams
<a name="streammanagerclient-update-constraints"></a>

The following constraints apply when updating streams. Unless noted in the following list, updates take effect immediately.
+ You can't update a stream's persistence. To change this behavior, [delete the stream](#streammanagerclient-delete-message-stream) and [create a stream](#streammanagerclient-create-message-stream) that defines the new persistence policy.
+ You can update the maximum size of a stream only under the following conditions:
  + The maximum size must be greater or equal to the current size of the stream. <a name="messagestreaminfo-describe-stream"></a>To find this information, [describe the stream](#streammanagerclient-describe-message-stream) and then check the storage status of the returned `MessageStreamInfo` object. 
  + The maximum size must be greater than or equal to the stream's segment size.
+ You can update the stream segment size to a value less than the maximum size of the stream. The updated setting applies to new segments.
+ Updates to the time to live (TTL) property apply to new append operations. If you decrease this value, stream manager might also delete existing segments that exceed the TTL.
+ Updates to the strategy on full property apply to new append operations. If you set the strategy to overwrite the oldest data, stream manager might also overwrite existing segments based on the new setting.
+ Updates to the flush on write property apply to new messages.
+ Updates to export configurations apply to new exports. The update request must include all export configurations that you want to support. Otherwise, stream manager deletes them.
  + When you update an export configuration, specify the identifier of the target export configuration.
  + To add an export configuration, specify a unique identifier for the new export configuration.
  + To delete an export configuration, omit the export configuration.
+ To [update](#streammanagerclient-update-message-stream) the starting sequence number of an export configuration in a stream, you must specify a value that's less than the latest sequence number. <a name="messagestreaminfo-describe-stream"></a>To find this information, [describe the stream](#streammanagerclient-describe-message-stream) and then check the storage status of the returned `MessageStreamInfo` object. 

## Delete message stream
<a name="streammanagerclient-delete-message-stream"></a>

Deletes a stream. When you delete a stream, all of the stored data for the stream is deleted from the disk.

### Requirements
<a name="streammanagerclient-delete-message-stream-reqs"></a>

This operation has the following requirements:
+ <a name="streammanagerclient-min-sm-sdk"></a>Minimum Stream Manager SDK version: Python: 1.1.0  \$1  Java: 1.1.0  \$1  Node.js: 1.1.0

### Examples
<a name="streammanagerclient-delete-message-stream-examples"></a>

The following snippet deletes the stream named `StreamName`.

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    client.delete_message_stream(stream_name="StreamName")
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [deleteMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.delete_message_stream)

------
#### [ Java ]

```
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
    client.deleteMessageStream("StreamName");
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [delete\$1message\$1stream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#deleteMessageStream-java.lang.String-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        await client.deleteMessageStream("StreamName");
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [deleteMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#deleteMessageStream)

------

## See also
<a name="work-with-streams-see-also"></a>
+ [Manage data streams on Greengrass core devices](manage-data-streams.md)
+ [Configure AWS IoT Greengrass stream manager](configure-stream-manager.md)
+ [Export configurations for supported AWS Cloud destinations](stream-export-configurations.md)
+ `StreamManagerClient` in the Stream Manager SDK reference:
  + [Python](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html)
  + [Java](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html)
  + [Node.js](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html)

# Export configurations for supported AWS Cloud destinations
<a name="stream-export-configurations"></a>

User-defined Greengrass components use `StreamManagerClient` in the Stream Manager SDK to interact with stream manager. When a component [creates a stream](work-with-streams.md#streammanagerclient-create-message-stream) or [updates a stream](work-with-streams.md#streammanagerclient-create-message-stream), it passes a `MessageStreamDefinition` object that represents stream properties, including the export definition. The `ExportDefinition` object contains the export configurations defined for the stream. Stream manager uses these export configurations to determine where and how to export the stream.

![\[Object model diagram of the ExportDefinition property type.\]](http://docs.aws.amazon.com/greengrass/v2/developerguide/images/stream-manager-exportconfigs.png)


You can define zero or more export configurations on a stream, including multiple export configurations for a single destination type. For example, you can export a stream to two AWS IoT Analytics channels and one Kinesis data stream.

For failed export attempts, stream manager continually retries exporting data to the AWS Cloud at intervals of up to five minutes. The number of retry attempts doesn't have a maximum limit.

**Note**  
<a name="streammanagerclient-http-config"></a>`StreamManagerClient` also provides a target destination you can use to export streams to an HTTP server. This target is intended for testing purposes only. It is not stable or supported for use in production environments.

**Topics**
+ [AWS IoT Analytics channels](#export-to-iot-analytics)
+ [Amazon Kinesis data streams](#export-to-kinesis)
+ [AWS IoT SiteWise asset properties](#export-to-iot-sitewise)
+ [Amazon S3 objects](#export-to-s3)

You are responsible for maintaining these AWS Cloud resources.

## AWS IoT Analytics channels
<a name="export-to-iot-analytics"></a>

Stream manager supports automatic exports to AWS IoT Analytics. <a name="ita-export-destination"></a>AWS IoT Analytics lets you perform advanced analysis on your data to help make business decisions and improve machine learning models. For more information, see [What is AWS IoT Analytics?](https://docs.aws.amazon.com/iotanalytics/latest/userguide/welcome.html) in the *AWS IoT Analytics User Guide*. 

In the Stream Manager SDK, your Greengrass components use the `IoTAnalyticsConfig` to define the export configuration for this destination type. For more information, see the SDK reference for your target language:
+ [IoTAnalyticsConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.IoTAnalyticsConfig) in the Python SDK
+ [IoTAnalyticsConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/export/IoTAnalyticsConfig.html) in the Java SDK
+ [IoTAnalyticsConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.IoTAnalyticsConfig.html) in the Node.js SDK

### Requirements
<a name="export-to-iot-analytics-reqs"></a>

This export destination has the following requirements:
+ Target channels in AWS IoT Analytics must be in the same AWS account and AWS Region as the Greengrass core device.
+ The [Authorize core devices to interact with AWS services](device-service-role.md) must allow the `iotanalytics:BatchPutMessage` permission to target channels. For example:

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "iotanalytics:BatchPutMessage"
        ],
        "Resource": [
          "arn:aws:iotanalytics:us-east-1:123456789012:channel/channel_1_name",
          "arn:aws:iotanalytics:us-east-1:123456789012:channel/channel_2_name"
        ]
      }
    ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>You can grant granular or conditional access to resources, for example, by using a wildcard `*` naming scheme. For more information, see [Adding and removing IAM policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html) in the *IAM User Guide*.

### Exporting to AWS IoT Analytics
<a name="export-streams-to-iot-analytics"></a>

To create a stream that exports to AWS IoT Analytics, your Greengrass components [create a stream](work-with-streams.md#streammanagerclient-create-message-stream) with an export definition that includes one or more `IoTAnalyticsConfig` objects. This object defines export settings, such as the target channel, batch size, batch interval, and priority.

When your Greengrass components receive data from devices, they [append messages](work-with-streams.md#streammanagerclient-append-message) that contain a blob of data to the target stream.

Then, stream manager exports the data based on the batch settings and priority defined in the stream's export configurations.

## Amazon Kinesis data streams
<a name="export-to-kinesis"></a>

Stream manager supports automatic exports to Amazon Kinesis Data Streams. <a name="aks-export-destination"></a>Kinesis Data Streams is commonly used to aggregate high-volume data and load it into a data warehouse or MapReduce cluster. For more information, see [What is Amazon Kinesis Data Streams?](https://docs.aws.amazon.com/streams/latest/dev/what-is-this-service.html) in the *Amazon Kinesis Developer Guide*. 

In the Stream Manager SDK, your Greengrass components use the `KinesisConfig` to define the export configuration for this destination type. For more information, see the SDK reference for your target language:
+ [KinesisConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.KinesisConfig) in the Python SDK
+ [KinesisConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/export/KinesisConfig.html) in the Java SDK
+ [KinesisConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.KinesisConfig.html) in the Node.js SDK

### Requirements
<a name="export-to-kinesis-reqs"></a>

This export destination has the following requirements:
+ Target streams in Kinesis Data Streams must be in the same AWS account and AWS Region as the Greengrass core device.
+ (Recommended) Stream manager v2.2.1 improves the performance of exporting streams to Kinesis Data Streams destinations. To use the improvements in this latest version, upgrade your [stream manager component](stream-manager-component.md) to v2.2.1 and use the `kinesis:ListShards` policy in the [Greengrass token exchange role](device-service-role.md). 
+ The [Authorize core devices to interact with AWS services](device-service-role.md) must allow the `kinesis:PutRecords` permission to target data streams. For example:

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "kinesis:PutRecords"
        ],
        "Resource": [
          "arn:aws:kinesis:us-east-1:123456789012:stream/stream_1_name",
          "arn:aws:kinesis:us-east-1:123456789012:stream/stream_2_name"
        ]
      }
    ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>You can grant granular or conditional access to resources, for example, by using a wildcard `*` naming scheme. For more information, see [Adding and removing IAM policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html) in the *IAM User Guide*.

### Exporting to Kinesis Data Streams
<a name="export-streams-to-kinesis"></a>

To create a stream that exports to Kinesis Data Streams, your Greengrass components [create a stream](work-with-streams.md#streammanagerclient-create-message-stream) with an export definition that includes one or more `KinesisConfig` objects. This object defines export settings, such as the target data stream, batch size, batch interval, and priority.

When your Greengrass components receive data from devices, they [append messages](work-with-streams.md#streammanagerclient-append-message) that contain a blob of data to the target stream. Then, stream manager exports the data based on the batch settings and priority defined in the stream's export configurations. 

Stream manager generates a unique, random UUID as a partition key for each record uploaded to Amazon Kinesis. 

## AWS IoT SiteWise asset properties
<a name="export-to-iot-sitewise"></a>

Stream manager supports automatic exports to AWS IoT SiteWise. <a name="itsw-export-destination"></a>AWS IoT SiteWise lets you collect, organize, and analyze data from industrial equipment at scale. For more information, see [What is AWS IoT SiteWise?](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/what-is-sitewise.html) in the *AWS IoT SiteWise User Guide*. 

In the Stream Manager SDK, your Greengrass components use the `IoTSiteWiseConfig` to define the export configuration for this destination type. For more information, see the SDK reference for your target language:
+ [IoTSiteWiseConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.IoTSiteWiseConfig) in the Python SDK
+ [IoTSiteWiseConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/export/IoTSiteWiseConfig.html) in the Java SDK
+ [IoTSiteWiseConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.IoTSiteWiseConfig.html) in the Node.js SDK

**Note**  
AWS also provides AWS IoT SiteWise components, which offer a pre-built solution that you can use to stream data from OPC-UA sources. For more information, see [IoT SiteWise OPC UA collector](iotsitewise-opcua-collector-component.md).

### Requirements
<a name="export-to-iot-sitewise-reqs"></a>

This export destination has the following requirements:
+ Target asset properties in AWS IoT SiteWise must be in the same AWS account and AWS Region as the Greengrass core device.
**Note**  
For the list of AWS Regions that AWS IoT SiteWise supports, see [AWS IoT SiteWise endpoints and quotas](https://docs.aws.amazon.com/general/latest/gr/iot-sitewise.html#iot-sitewise_region) in the *AWS General Reference*.
+ The [Authorize core devices to interact with AWS services](device-service-role.md) must allow the `iotsitewise:BatchPutAssetPropertyValue` permission to target asset properties. The following example policy uses the `iotsitewise:assetHierarchyPath` condition key to grant access to a target root asset and its children. You can remove the `Condition` from the policy to allow access to all of your AWS IoT SiteWise assets or specify ARNs of individual assets.

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
         "Effect": "Allow",
         "Action": "iotsitewise:BatchPutAssetPropertyValue",
         "Resource": "*",
         "Condition": {
           "StringLike": {
             "iotsitewise:assetHierarchyPath": [
               "/root node asset ID",
               "/root node asset ID/*"
             ]
           }
         }
      }
    ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>You can grant granular or conditional access to resources, for example, by using a wildcard `*` naming scheme. For more information, see [Adding and removing IAM policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html) in the *IAM User Guide*.

  For important security information, see [ BatchPutAssetPropertyValue authorization](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/security_iam_service-with-iam.html#security_iam_service-with-iam-id-based-policies-batchputassetpropertyvalue-action) in the *AWS IoT SiteWise User Guide*.

### Exporting to AWS IoT SiteWise
<a name="export-streams-to-sitewise"></a>

To create a stream that exports to AWS IoT SiteWise, your Greengrass components [create a stream](work-with-streams.md#streammanagerclient-create-message-stream) with an export definition that includes one or more `IoTSiteWiseConfig` objects. This object defines export settings, such as the batch size, batch interval, and priority.

When your Greengrass components receive asset property data from devices, they append messages that contain the data to the target stream. Messages are JSON-serialized `PutAssetPropertyValueEntry` objects that contain property values for one or more asset properties. For more information, see [Append message](work-with-streams.md#streammanagerclient-append-message-sitewise) for AWS IoT SiteWise export destinations.

**Note**  
<a name="BatchPutAssetPropertyValue-data-reqs"></a>When you send data to AWS IoT SiteWise, your data must meet the requirements of the `BatchPutAssetPropertyValue` action. For more information, see [BatchPutAssetPropertyValue](https://docs.aws.amazon.com/iot-sitewise/latest/APIReference/API_BatchPutAssetPropertyValue.html) in the *AWS IoT SiteWise API Reference*.

Then, stream manager exports the data based on the batch settings and priority defined in the stream's export configurations.

You can adjust your stream manager settings and Greengrass component logic to design your export strategy. For example:
+ For near real time exports, set low batch size and interval settings and append the data to the stream when it's received.
+ To optimize batching, mitigate bandwidth constraints, or minimize cost, your Greengrass components can pool the timestamp-quality-value (TQV) data points received for a single asset property before appending the data to the stream. One strategy is to batch entries for up to 10 different property-asset combinations, or property aliases, in one message instead of sending more than one entry for the same property. This helps stream manager to remain within [AWS IoT SiteWise quotas](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/quotas.html).

## Amazon S3 objects
<a name="export-to-s3"></a>

Stream manager supports automatic exports to Amazon S3. <a name="s3-export-destination"></a>You can use Amazon S3 to store and retrieve large amounts of data. For more information, see [What is Amazon S3?](https://docs.aws.amazon.com/AmazonS3/latest/dev/Welcome.html) in the *Amazon Simple Storage Service Developer Guide*. 

In the Stream Manager SDK, your Greengrass components use the `S3ExportTaskExecutorConfig` to define the export configuration for this destination type. For more information, see the SDK reference for your target language:
+ [S3ExportTaskExecutorConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.S3ExportTaskExecutorConfig) in the Python SDK
+ [S3ExportTaskExecutorConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/export/S3ExportTaskExecutorConfig.html) in the Java SDK
+ [S3ExportTaskExecutorConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.S3ExportTaskExecutorConfig.html) in the Node.js SDK

### Requirements
<a name="export-to-s3-reqs"></a>

This export destination has the following requirements:
+ Target Amazon S3 buckets must be in the same AWS account as the Greengrass core device.
+ If a Lambda function that runs in **Greengrass container** mode writes input files to an input file directory, you must mount the directory as a volume in the container with write permissions. This ensures that the files are written to the root file system and visible to the stream manager component, which runs outside the container.
+ If a Docker container component writes input files to an input file directory, you must mount the directory as a volume in the container with write permissions. This ensures that the files are written to the root file system and visible to the stream manager component, which runs outside the container.
+ The [Authorize core devices to interact with AWS services](device-service-role.md) must allow the following permissions to the target buckets. For example:

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "s3:PutObject",
          "s3:AbortMultipartUpload",
          "s3:ListMultipartUploadParts"
        ],
        "Resource": [
          "arn:aws:s3:::bucket-1-name/*",
          "arn:aws:s3:::bucket-2-name/*"
        ]
      }
    ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>You can grant granular or conditional access to resources, for example, by using a wildcard `*` naming scheme. For more information, see [Adding and removing IAM policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html) in the *IAM User Guide*.

### Exporting to Amazon S3
<a name="export-streams-to-s3"></a>

To create a stream that exports to Amazon S3, your Greengrass components use the `S3ExportTaskExecutorConfig` object to configure the export policy. The policy defines export settings, such as the multipart upload threshold and priority. For Amazon S3 exports, stream manager uploads data that it reads from local files on the core device. To initiate an upload, your Greengrass components append an export task to the target stream. The export task contains information about the input file and target Amazon S3 object. Stream manager runs tasks in the sequence that they are appended to the stream.

**Note**  
 <a name="bucket-not-key-must-exist"></a>The target bucket must already exist in your AWS account. If an object for the specified key doesn't exist, stream manager creates the object for you. 

Stream manager uses the multipart upload threshold property, [minimum part size](configure-stream-manager.md#stream-manager-minimum-part-size) setting, and size of the input file to determine how to upload data. The multipart upload threshold must be greater or equal to the minimum part size. If you want to upload data in parallel, you can create multiple streams.

The keys that specify your target Amazon S3 objects can include valid [Java DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html) strings in `!{timestamp:value}` placeholders. You can use these timestamp placeholders to partition data in Amazon S3 based on the time that the input file data was uploaded. For example, the following key name resolves to a value such as `my-key/2020/12/31/data.txt`.

```
my-key/!{timestamp:YYYY}/!{timestamp:MM}/!{timestamp:dd}/data.txt
```

**Note**  
If you want to monitor the export status for a stream, first create a status stream and then configure the export stream to use it. For more information, see [Monitor export tasks](#monitor-export-status-s3).

#### Manage input data
<a name="manage-s3-input-data"></a>

You can author code that IoT applications use to manage the lifecycle of the input data. The following example workflow shows how you might use Greengrass components to manage this data.

1. A local process receives data from devices or peripherals, and then writes the data to files in a directory on the core device. These are the input files for stream manager.

1. A Greengrass component scans the directory and [appends an export task](work-with-streams.md#streammanagerclient-append-message-export-task) to the target stream when a new file is created. The task is a JSON-serialized `S3ExportTaskDefinition` object that specifies the URL of the input file, the target Amazon S3 bucket and key, and optional user metadata.

1. Stream manager reads the input file and exports the data to Amazon S3 in the order of appended tasks. <a name="bucket-not-key-must-exist"></a>The target bucket must already exist in your AWS account. If an object for the specified key doesn't exist, stream manager creates the object for you. 

1. The Greengrass component [reads messages](work-with-streams.md#streammanagerclient-read-messages) from a status stream to monitor the export status. After export tasks are completed, the Greengrass component can delete the corresponding input files. For more information, see [Monitor export tasks](#monitor-export-status-s3).

### Monitor export tasks
<a name="monitor-export-status-s3"></a>

You can author code that IoT applications use to monitor the status of your Amazon S3 exports. Your Greengrass components must create a status stream and then configure the export stream to write status updates to the status stream. A single status stream can receive status updates from multiple streams that export to Amazon S3.

First, [create a stream](work-with-streams.md#streammanagerclient-create-message-stream) to use as the status stream. You can configure the size and retention policies for the stream to control the lifespan of the status messages. For example:
+ Set `Persistence` to `Memory` if you don't want to store the status messages.
+ Set `StrategyOnFull` to `OverwriteOldestData` so that new status messages are not lost.

Then, create or update the export stream to use the status stream. Specifically, set the status configuration property of the stream’s `S3ExportTaskExecutorConfig` export configuration. This setting tells stream manager to write status messages about the export tasks to the status stream. In the `StatusConfig` object, specify the name of the status stream and the level of verbosity. The following supported values range from least verbose (`ERROR`) to most verbose (`TRACE`). The default is `INFO`.
+ `ERROR`
+ `WARN`
+ `INFO`
+ `DEBUG`
+ `TRACE`

The following example workflow shows how Greengrass components might use a status stream to monitor export status.

1. As described in the previous workflow, a Greengrass component [appends an export task](work-with-streams.md#streammanagerclient-append-message-export-task) to a stream that's configured to write status messages about export tasks to a status stream. The append operation return a sequence number that represents the task ID.

1. A Greengrass component [reads messages](work-with-streams.md#streammanagerclient-read-messages) sequentially from the status stream, and then filters the messages based on the stream name and task ID or based on an export task property from the message context. For example, the Greengrass component can filter by the input file URL of the export task, which is represented by the `S3ExportTaskDefinition` object in the message context.

   The following status codes indicate that an export task has reached a completed state:
   + `Success`. The upload was completed successfully.
   + `Failure`. Stream manager encountered an error, for example, the specified bucket does not exist. After resolving the issue, you can append the export task to the stream again.
   + `Canceled`. The task was stopped because the stream or export definition was deleted, or the time-to-live (TTL) period of the task expired.
**Note**  
The task might also have a status of `InProgress` or `Warning`. Stream manager issues warnings when an event returns an error that doesn't affect the execution of the task. For example, a failure to clean up a partial upload returns a warning.

1. After export tasks are completed, the Greengrass component can delete the corresponding input files.

The following example shows how a Greengrass component might read and process status messages.

------
#### [ Python ]

```
import time
from stream_manager import (
    ReadMessagesOptions,
    Status,
    StatusConfig,
    StatusLevel,
    StatusMessage,
    StreamManagerClient,
)
from stream_manager.util import Util

client = StreamManagerClient()
 
try:
    # Read the statuses from the export status stream
    is_file_uploaded_to_s3 = False
    while not is_file_uploaded_to_s3:
        try:
            messages_list = client.read_messages(
                "StatusStreamName", ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000)
            )
            for message in messages_list:
                # Deserialize the status message first.
                status_message = Util.deserialize_json_bytes_to_obj(message.payload, StatusMessage)

                # Check the status of the status message. If the status is "Success",
                # the file was successfully uploaded to S3.
                # If the status was either "Failure" or "Cancelled", the server was unable to upload the file to S3.
                # We will print the message for why the upload to S3 failed from the status message.
                # If the status was "InProgress", the status indicates that the server has started uploading
                # the S3 task.
                if status_message.status == Status.Success:
                    logger.info("Successfully uploaded file at path " + file_url + " to S3.")
                    is_file_uploaded_to_s3 = True
                elif status_message.status == Status.Failure or status_message.status == Status.Canceled:
                    logger.info(
                        "Unable to upload file at path " + file_url + " to S3. Message: " + status_message.message
                    )
                    is_file_uploaded_to_s3 = True
            time.sleep(5)
        except StreamManagerException:
            logger.exception("Exception while running")
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [read\$1messages](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.read_messages) \$1 [StatusMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.StatusMessage)

------
#### [ Java ]

```
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient;
import com.amazonaws.greengrass.streammanager.client.StreamManagerClientFactory;
import com.amazonaws.greengrass.streammanager.client.utils.ValidateAndSerialize;
import com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions;
import com.amazonaws.greengrass.streammanager.model.Status;
import com.amazonaws.greengrass.streammanager.model.StatusConfig;
import com.amazonaws.greengrass.streammanager.model.StatusLevel;
import com.amazonaws.greengrass.streammanager.model.StatusMessage;

 try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
    try {
        boolean isS3UploadComplete = false;
        while (!isS3UploadComplete) {
            try {
                // Read the statuses from the export status stream
                List<Message> messages = client.readMessages("StatusStreamName",
                    new ReadMessagesOptions().withMinMessageCount(1L).withReadTimeoutMillis(1000L));
                for (Message message : messages) {
                    // Deserialize the status message first.
                    StatusMessage statusMessage = ValidateAndSerialize.deserializeJsonBytesToObj(message.getPayload(), StatusMessage.class);
                    // Check the status of the status message. If the status is "Success", the file was successfully uploaded to S3.
                    // If the status was either "Failure" or "Canceled", the server was unable to upload the file to S3.
                    // We will print the message for why the upload to S3 failed from the status message.
                    // If the status was "InProgress", the status indicates that the server has started uploading the S3 task.
                    if (Status.Success.equals(statusMessage.getStatus())) {
                        System.out.println("Successfully uploaded file at path " + FILE_URL + " to S3.");
                        isS3UploadComplete = true;
                     } else if (Status.Failure.equals(statusMessage.getStatus()) || Status.Canceled.equals(statusMessage.getStatus())) {
                        System.out.println(String.format("Unable to upload file at path %s to S3. Message %s",
                            statusMessage.getStatusContext().getS3ExportTaskDefinition().getInputUrl(),
                            statusMessage.getMessage()));
                        sS3UploadComplete = true;
                    }
                }
            } catch (StreamManagerException ignored) {
            } finally {
                // Sleep for sometime for the S3 upload task to complete before trying to read the status message.
                Thread.sleep(5000);
            }
        } catch (e) {
        // Properly handle errors.
    }
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [readMessages](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#readMessages-java.lang.String-com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions-) \$1 [StatusMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/StatusMessage.html)

------
#### [ Node.js ]

```
const {
    StreamManagerClient, ReadMessagesOptions,
    Status, StatusConfig, StatusLevel, StatusMessage,
    util,
} = require(*'aws-greengrass-stream-manager-sdk'*);

const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        let isS3UploadComplete = false;
        while (!isS3UploadComplete) {
            try {
                // Read the statuses from the export status stream
                const messages = await c.readMessages("StatusStreamName",
                    new ReadMessagesOptions()
                        .withMinMessageCount(1)
                        .withReadTimeoutMillis(1000));

                messages.forEach((message) => {
                    // Deserialize the status message first.
                    const statusMessage = util.deserializeJsonBytesToObj(message.payload, StatusMessage);
                    // Check the status of the status message. If the status is 'Success', the file was successfully uploaded to S3.
                    // If the status was either 'Failure' or 'Cancelled', the server was unable to upload the file to S3.
                    // We will print the message for why the upload to S3 failed from the status message.
                    // If the status was "InProgress", the status indicates that the server has started uploading the S3 task.
                    if (statusMessage.status === Status.Success) {
                        console.log(`Successfully uploaded file at path ${FILE_URL} to S3.`);
                        isS3UploadComplete = true;
                    } else if (statusMessage.status === Status.Failure || statusMessage.status === Status.Canceled) {
                        console.log(`Unable to upload file at path ${FILE_URL} to S3. Message: ${statusMessage.message}`);
                        isS3UploadComplete = true;
                    }
                });
                // Sleep for sometime for the S3 upload task to complete before trying to read the status message.
                await new Promise((r) => setTimeout(r, 5000));
            } catch (e) {
                // Ignored
            }
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [readMessages](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#readMessages) \$1 [StatusMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StatusMessage.html)

------