

End of support notice: On October 7th, 2026, AWS will discontinue support for AWS IoT Greengrass Version 1. After October 7th, 2026, you will no longer be able to access the AWS IoT Greengrass V1 resources. For more information, please visit [Migrate from AWS IoT Greengrass Version 1](https://docs.aws.amazon.com/greengrass/v2/developerguide/migrate-from-v1.html).

# Use StreamManagerClient to work with streams
<a name="work-with-streams"></a>

User-defined Lambda functions running on the AWS IoT Greengrass core can use the `StreamManagerClient` object in the [AWS IoT Greengrass Core SDK](lambda-functions.md#lambda-sdks) to create streams in [stream manager](stream-manager.md) and then interact with the streams. When a Lambda function creates a stream, it defines the AWS Cloud destinations, prioritization, and other export and data retention policies for the stream. To send data to stream manager, Lambda functions append the data to the stream. If an export destination is defined for the stream, stream manager exports the stream automatically.

**Note**  
<a name="stream-manager-clients"></a>Typically, clients of stream manager are user-defined Lambda functions. If your business case requires it, you can also allow non-Lambda processes running on the Greengrass core (for example, a Docker container) to interact with stream manager. For more information, see [Client authentication](stream-manager.md#stream-manager-security-client-authentication).

The snippets in this topic show you how clients call `StreamManagerClient` methods to work with streams. For implementation details about the methods and their arguments, use the links to the SDK reference listed after each snippet. For tutorials that include a complete Python Lambda function, see [Export data streams to the AWS Cloud (console)](stream-manager-console.md) or [Export data streams to the AWS Cloud (CLI)](stream-manager-cli.md).

Your Lambda function should instantiate `StreamManagerClient` outside of the function handler. If instantiated in the handler, the function creates a `client` and connection to stream manager every time that it's invoked.

**Note**  
If you do instantiate `StreamManagerClient` in the handler, you must explicitly call the `close()` method when the `client` completes its work. Otherwise, the `client` keeps the connection open and another thread running until the script exits.

`StreamManagerClient` supports the following operations:
+ [Create message stream](#streammanagerclient-create-message-stream)
+ [Append message](#streammanagerclient-append-message)
+ [Read messages](#streammanagerclient-read-messages)
+ [List streams](#streammanagerclient-list-streams)
+ [Describe message stream](#streammanagerclient-describe-message-stream)
+ [Update message stream](#streammanagerclient-update-message-stream)
+ [Delete message stream](#streammanagerclient-delete-message-stream)

## Create message stream
<a name="streammanagerclient-create-message-stream"></a>

To create a stream, a user-defined Lambda function calls the create method and passes in a `MessageStreamDefinition` object. This object specifies the unique name for the stream and defines how stream manager should handle new data when the maximum stream size is reached. You can use `MessageStreamDefinition` and its data types (such as `ExportDefinition`, `StrategyOnFull`, and `Persistence`) to define other stream properties. These include:
+ The target AWS IoT Analytics, Kinesis Data Streams, AWS IoT SiteWise, and Amazon S3 destinations for automatic exports. For more information, see [Export configurations for supported AWS Cloud destinations](stream-export-configurations.md).
+ Export priority. Stream manager exports higher priority streams before lower priority streams.
+ Maximum batch size and batch interval for AWS IoT Analytics, Kinesis Data Streams, and AWS IoT SiteWise destinations. Stream manager exports messages when either condition is met.
+ Time-to-live (TTL). The amount of time to guarantee that the stream data is available for processing. You should make sure that the data can be consumed within this time period. This is not a deletion policy. The data might not be deleted immediately after TTL period.
+ Stream persistence. Choose to save streams to the file system to persist data across core restarts or save streams in memory.
+ Starting sequence number. Specify the sequence number of the message to use as the starting message in the export.

For more information about `MessageStreamDefinition`, see the SDK reference for your target language:
+ [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html) in the Java SDK
+ [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html) in the Node.js SDK
+ [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.MessageStreamDefinition) in the Python SDK

**Note**  
<a name="streammanagerclient-http-config"></a>`StreamManagerClient` also provides a target destination you can use to export streams to an HTTP server. This target is intended for testing purposes only. It is not stable or supported for use in production environments.

After a stream is created, your Lambda functions can [append messages](#streammanagerclient-append-message) to the stream to send data for export and [read messages](#streammanagerclient-append-message) from the stream for local processing. The number of streams that you create depends on your hardware capabilities and business case. One strategy is to create a stream for each target channel in AWS IoT Analytics or Kinesis data stream, though you can define multiple targets for a stream. A stream has a durable lifespan.

### Requirements
<a name="streammanagerclient-create-message-stream-reqs"></a>

This operation has the following requirements:
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core version: 1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.5.0  \$1  Java: 1.4.0  \$1  Node.js: 1.6.0

**Note**  
Creating streams with an AWS IoT SiteWise or Amazon S3 export destination has the following requirements:  
<a name="streammanagerclient-min-ggc-1.11.0"></a>Minimum AWS IoT Greengrass Core version: 1.11.0
<a name="streammanagerclient-min-sdk-ggc-1.11.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.6.0  \$1  Java: 1.5.0  \$1  Node.js: 1.7.0

### Examples
<a name="streammanagerclient-create-message-stream-examples"></a>

The following snippet creates a stream named `StreamName`. It defines stream properties in the `MessageStreamDefinition` and subordinate data types.

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    client.create_message_stream(MessageStreamDefinition(
        name="StreamName",  # Required.
        max_size=268435456,  # Default is 256 MB.
        stream_segment_size=16777216,  # Default is 16 MB.
        time_to_live_millis=None,  # By default, no TTL is enabled.
        strategy_on_full=StrategyOnFull.OverwriteOldestData,  # Required.
        persistence=Persistence.File,  # Default is File.
        flush_on_write=False,  # Default is false.
        export_definition=ExportDefinition(  # Optional. Choose where/how the stream is exported to the AWS Cloud.
            kinesis=None,
            iot_analytics=None,
            iot_sitewise=None,
            s3_task_executor=None
        )
    ))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [create\$1message\$1stream](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.create_message_stream) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.MessageStreamDefinition)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    client.createMessageStream(
            new MessageStreamDefinition()
                    .withName("StreamName") // Required.
                    .withMaxSize(268435456L)  // Default is 256 MB.
                    .withStreamSegmentSize(16777216L)  // Default is 16 MB.
                    .withTimeToLiveMillis(null)  // By default, no TTL is enabled.
                    .withStrategyOnFull(StrategyOnFull.OverwriteOldestData)  // Required.
                    .withPersistence(Persistence.File)  // Default is File.
                    .withFlushOnWrite(false)  // Default is false.
                    .withExportDefinition(  // Optional. Choose where/how the stream is exported to the AWS Cloud.
                            new ExportDefinition()
                                    .withKinesis(null)
                                    .withIotAnalytics(null)
                                    .withIotSitewise(null)
                                    .withS3TaskExecutor(null)
                    )
 
    );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [createMessageStream](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#createMessageStream-com.amazonaws.greengrass.streammanager.model.MessageStreamDefinition-) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        await client.createMessageStream(
            new MessageStreamDefinition()
                .withName("StreamName") // Required.
                .withMaxSize(268435456)  // Default is 256 MB.
                .withStreamSegmentSize(16777216)  // Default is 16 MB.
                .withTimeToLiveMillis(null)  // By default, no TTL is enabled.
                .withStrategyOnFull(StrategyOnFull.OverwriteOldestData)  // Required.
                .withPersistence(Persistence.File)  // Default is File.
                .withFlushOnWrite(false)  // Default is false.
                .withExportDefinition(  // Optional. Choose where/how the stream is exported to the AWS Cloud.
                    new ExportDefinition()
                        .withKinesis(null)
                        .withIotAnalytics(null)
                        .withIotSitewise(null)
                        .withS3TaskExecutor(null)
                )
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [createMessageStream](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#createMessageStream) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html)

------

For more information about configuring export destinations, see [Export configurations for supported AWS Cloud destinations](stream-export-configurations.md).

 

## Append message
<a name="streammanagerclient-append-message"></a>

To send data to stream manager for export, your Lambda functions append the data to the target stream. The export destination determines the data type to pass to this method.

### Requirements
<a name="streammanagerclient-append-message-reqs"></a>

This operation has the following requirements:
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core version: 1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.5.0  \$1  Java: 1.4.0  \$1  Node.js: 1.6.0

**Note**  
Appending messages with an AWS IoT SiteWise or Amazon S3 export destination has the following requirements:  
<a name="streammanagerclient-min-ggc-1.11.0"></a>Minimum AWS IoT Greengrass Core version: 1.11.0
<a name="streammanagerclient-min-sdk-ggc-1.11.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.6.0  \$1  Java: 1.5.0  \$1  Node.js: 1.7.0

### Examples
<a name="streammanagerclient-append-message-examples"></a>

#### AWS IoT Analytics or Kinesis Data Streams export destinations
<a name="streammanagerclient-append-message-blob"></a>

The following snippet appends a message to the stream named `StreamName`. For AWS IoT Analytics or Kinesis Data Streams destinations, your Lambda functions append a blob of data.

This snippet has the following requirements:
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core version: 1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.5.0  \$1  Java: 1.4.0  \$1  Node.js: 1.6.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data')
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [append\$1message](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.append_message)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes());
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [appendMessage](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array"));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [appendMessage](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage)

------

#### AWS IoT SiteWise export destinations
<a name="streammanagerclient-append-message-sitewise"></a>

The following snippet appends a message to the stream named `StreamName`. For AWS IoT SiteWise destinations, your Lambda functions append a serialized `PutAssetPropertyValueEntry` object. For more information, see [Exporting to AWS IoT SiteWise](stream-export-configurations.md#export-streams-to-sitewise).

**Note**  
<a name="BatchPutAssetPropertyValue-data-reqs"></a>When you send data to AWS IoT SiteWise, your data must meet the requirements of the `BatchPutAssetPropertyValue` action. For more information, see [BatchPutAssetPropertyValue](https://docs.aws.amazon.com/iot-sitewise/latest/APIReference/API_BatchPutAssetPropertyValue.html) in the *AWS IoT SiteWise API Reference*.

This snippet has the following requirements:
+ <a name="streammanagerclient-min-ggc-1.11.0"></a>Minimum AWS IoT Greengrass Core version: 1.11.0
+ <a name="streammanagerclient-min-sdk-ggc-1.11.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.6.0  \$1  Java: 1.5.0  \$1  Node.js: 1.7.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    # SiteWise requires unique timestamps in all messages. Add some randomness to time and offset.

    # Note: To create a new asset property data, you should use the classes defined in the
    # greengrasssdk.stream_manager module.

    time_in_nanos = TimeInNanos(
        time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
    )
    variant = Variant(double_value=random.random())
    asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
    putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset)
    sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [append\$1message](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.append_message) \$1 [PutAssetPropertyValueEntry](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.PutAssetPropertyValueEntry)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    Random rand = new Random();
    // Note: To create a new asset property data, you should use the classes defined in the
    // com.amazonaws.greengrass.streammanager.model.sitewise package.
    List<AssetPropertyValue> entries = new ArrayList<>() ;

    // IoTSiteWise requires unique timestamps in all messages. Add some randomness to time and offset.
    final int maxTimeRandomness = 60;
    final int maxOffsetRandomness = 10000;
    double randomValue = rand.nextDouble();
    TimeInNanos timestamp = new TimeInNanos()
            .withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness))
            .withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness)));
    AssetPropertyValue entry = new AssetPropertyValue()
            .withValue(new Variant().withDoubleValue(randomValue))
            .withQuality(Quality.GOOD)
            .withTimestamp(timestamp);
    entries.add(entry);

    PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
            .withEntryId(UUID.randomUUID().toString())
            .withPropertyAlias("PropertyAlias")
            .withPropertyValues(entries);
    long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [appendMessage](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-) \$1 [PutAssetPropertyValueEntry](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/sitewise/PutAssetPropertyValueEntry.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const maxTimeRandomness = 60;
        const maxOffsetRandomness = 10000;
        const randomValue = Math.random();
        // Note: To create a new asset property data, you should use the classes defined in the
        // aws-greengrass-core-sdk StreamManager module.
        const timestamp = new TimeInNanos()
            .withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness))
            .withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness));
        const entry = new AssetPropertyValue()
            .withValue(new Variant().withDoubleValue(randomValue))
            .withQuality(Quality.GOOD)
            .withTimestamp(timestamp);

        const putAssetPropertyValueEntry =  new PutAssetPropertyValueEntry()
            .withEntryId(`${ENTRY_ID_PREFIX}${i}`)
            .withPropertyAlias("PropertyAlias")
            .withPropertyValues([entry]);
        const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [appendMessage](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage) \$1 [PutAssetPropertyValueEntry](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.PutAssetPropertyValueEntry.html)

------

#### Amazon S3 export destinations
<a name="streammanagerclient-append-message-export-task"></a>

The following snippet appends an export task to the stream named `StreamName`. For Amazon S3 destinations, your Lambda functions append a serialized `S3ExportTaskDefinition` object that contains information about the source input file and target Amazon S3 object. If the specified object doesn't exist, Stream Manager creates it for you. For more information, see [Exporting to Amazon S3](stream-export-configurations.md#export-streams-to-s3).

This snippet has the following requirements:
+ <a name="streammanagerclient-min-ggc-1.11.0"></a>Minimum AWS IoT Greengrass Core version: 1.11.0
+ <a name="streammanagerclient-min-sdk-ggc-1.11.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.6.0  \$1  Java: 1.5.0  \$1  Node.js: 1.7.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    # Append an Amazon S3 Task definition and print the sequence number.
    s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName")
    sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [append\$1message](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.append_message) \$1 [S3ExportTaskDefinition](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.S3ExportTaskDefinition)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    // Append an Amazon S3 export task definition and print the sequence number.
    S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition()
        .withBucket("BucketName")
        .withKey("KeyName")
        .withInputUrl("URLToFile");
    long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition));
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [appendMessage](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-) \$1 [S3ExportTaskDefinition](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/S3ExportTaskDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
     // Append an Amazon S3 export task definition and print the sequence number.
     const taskDefinition = new S3ExportTaskDefinition()
        .withBucket("BucketName")
        .withKey("KeyName")
        .withInputUrl("URLToFile");
        const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition)));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [appendMessage](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage) \$1 [S3ExportTaskDefinition](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.S3ExportTaskDefinition.html)

------

 

## Read messages
<a name="streammanagerclient-read-messages"></a>

Read messages from a stream.

### Requirements
<a name="streammanagerclient-read-messages-reqs"></a>

This operation has the following requirements:
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core version: 1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.5.0  \$1  Java: 1.4.0  \$1  Node.js: 1.6.0

### Examples
<a name="streammanagerclient-read-messages-examples"></a>

The following snippet reads messages from the stream named `StreamName`. The read method takes an optional `ReadMessagesOptions` object that specifies the sequence number to start reading from, the minimum and maximum numbers to read, and a timeout for reading messages.

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    message_list = client.read_messages(
        stream_name="StreamName",
        # By default, if no options are specified, it tries to read one message from the beginning of the stream.
        options=ReadMessagesOptions(
            desired_start_sequence_number=100,
            # Try to read from sequence number 100 or greater. By default, this is 0.
            min_message_count=10,
            # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
            max_message_count=100,  # Accept up to 100 messages. By default this is 1.
            read_timeout_millis=5000
            # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
        )
    )
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [read\$1messages](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.read_messages) \$1 [ReadMessagesOptions](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.ReadMessagesOptions)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    List<Message> messages = client.readMessages("StreamName",
            // By default, if no options are specified, it tries to read one message from the beginning of the stream.
            new ReadMessagesOptions()
                    // Try to read from sequence number 100 or greater. By default this is 0.
                    .withDesiredStartSequenceNumber(100L)
                    // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
                    .withMinMessageCount(10L)
                    // Accept up to 100 messages. By default this is 1.
                    .withMaxMessageCount(100L)
                    // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
                    .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis())
    );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [readMessages](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#readMessages-java.lang.String-com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions-) \$1 [ReadMessagesOptions](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/ReadMessagesOptions.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const messages = await client.readMessages("StreamName",
            // By default, if no options are specified, it tries to read one message from the beginning of the stream.
            new ReadMessagesOptions()
                // Try to read from sequence number 100 or greater. By default this is 0.
                .withDesiredStartSequenceNumber(100)
                // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1.
                .withMinMessageCount(10)
                // Accept up to 100 messages. By default this is 1.
                .withMaxMessageCount(100)
                // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
                .withReadTimeoutMillis(5 * 1000)
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [readMessages](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#readMessages) \$1 [ReadMessagesOptions](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.ReadMessagesOptions.html)

------

 

## List streams
<a name="streammanagerclient-list-streams"></a>

Get the list of streams in stream manager.

### Requirements
<a name="streammanagerclient-list-streams-reqs"></a>

This operation has the following requirements:
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core version: 1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.5.0  \$1  Java: 1.4.0  \$1  Node.js: 1.6.0

### Examples
<a name="streammanagerclient-list-streams-examples"></a>

The following snippet gets a list of the streams (by name) in stream manager.

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    stream_names = client.list_streams()
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [list\$1streams](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.list_streams)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    List<String> streamNames = client.listStreams();
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [listStreams](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#listStreams--)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const streams = await client.listStreams();
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [listStreams](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#listStreams)

------

 

## Describe message stream
<a name="streammanagerclient-describe-message-stream"></a>

Get metadata about a stream, including the stream definition, size, and export status.

### Requirements
<a name="streammanagerclient-describe-message-stream-reqs"></a>

This operation has the following requirements:
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core version: 1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.5.0  \$1  Java: 1.4.0  \$1  Node.js: 1.6.0

### Examples
<a name="streammanagerclient-describe-message-stream-examples"></a>

The following snippet gets metadata about the stream named `StreamName`, including the stream's definition, size, and exporter statuses.

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    stream_description = client.describe_message_stream(stream_name="StreamName")
    if stream_description.export_statuses[0].error_message:
        # The last export of export destination 0 failed with some error
        # Here is the last sequence number that was successfully exported
        stream_description.export_statuses[0].last_exported_sequence_number
 
    if (stream_description.storage_status.newest_sequence_number >
            stream_description.export_statuses[0].last_exported_sequence_number):
        pass
        # The end of the stream is ahead of the last exported sequence number
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [describe\$1message\$1stream](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.describe_message_stream)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    MessageStreamInfo description = client.describeMessageStream("StreamName");
    String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage();
    if (lastErrorMessage != null && !lastErrorMessage.equals("")) {
        // The last export of export destination 0 failed with some error.
        // Here is the last sequence number that was successfully exported.
        description.getExportStatuses().get(0).getLastExportedSequenceNumber();
    }
 
    if (description.getStorageStatus().getNewestSequenceNumber() >
            description.getExportStatuses().get(0).getLastExportedSequenceNumber()) {
        // The end of the stream is ahead of the last exported sequence number.
    }
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [describeMessageStream](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#describeMessageStream-java.lang.String-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const description = await client.describeMessageStream("StreamName");
        const lastErrorMessage = description.exportStatuses[0].errorMessage;
        if (lastErrorMessage) {
            // The last export of export destination 0 failed with some error.
            // Here is the last sequence number that was successfully exported.
            description.exportStatuses[0].lastExportedSequenceNumber;
        }
 
        if (description.storageStatus.newestSequenceNumber >
            description.exportStatuses[0].lastExportedSequenceNumber) {
            // The end of the stream is ahead of the last exported sequence number.
        }
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [describeMessageStream](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#describeMessageStream)

------

 

## Update message stream
<a name="streammanagerclient-update-message-stream"></a>

Update properties of an existing stream. You might want to update a stream if your requirements change after the stream was created. For example:
+ Add a new [export configuration](stream-export-configurations.md) for an AWS Cloud destination.
+ Increase the maximum size of a stream to change how data is exported or retained. For example, the stream size in combination with your strategy on full settings might result in data being deleted or rejected before stream manager can process it.
+ Pause and resume exports; for example, if export tasks are long running and you want to ration your upload data.

Your Lambda functions follow this high-level process to update a stream:

1. [Get the description of the stream.](#streammanagerclient-describe-message-stream)

1. Update the target properties on the corresponding `MessageStreamDefinition` and subordinate objects.

1. Pass in the updated `MessageStreamDefinition`. Make sure to include the complete object definitions for the updated stream. Undefined properties revert to the default values.

   You can specify the sequence number of the message to use as the starting message in the export.

### Requirements
<a name="-streammanagerclient-update-message-streamreqs"></a>

This operation has the following requirements:
+ <a name="streammanagerclient-min-ggc-1.11.0"></a>Minimum AWS IoT Greengrass Core version: 1.11.0
+ <a name="streammanagerclient-min-sdk-ggc-1.11.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.6.0  \$1  Java: 1.5.0  \$1  Node.js: 1.7.0

### Examples
<a name="streammanagerclient-update-message-stream-examples"></a>

The following snippet updates the stream named `StreamName`. It updates multiple properties of a stream that exports to Kinesis Data Streams.

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    message_stream_info = client.describe_message_stream(STREAM_NAME)
    message_stream_info.definition.max_size=536870912
    message_stream_info.definition.stream_segment_size=33554432
    message_stream_info.definition.time_to_live_millis=3600000
    message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData
    message_stream_info.definition.persistence=Persistence.Memory
    message_stream_info.definition.flush_on_write=False
    message_stream_info.definition.export_definition.kinesis=
        [KinesisConfig(  
            # Updating Export definition to add a Kinesis Stream configuration.
            identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))]
    client.update_message_stream(message_stream_info.definition)
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [updateMessageStream](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.update_message_stream) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.MessageStreamDefinition)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME);
    // Update the message stream with new values.
    client.updateMessageStream(
        messageStreamInfo.getDefinition()
            .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
            // Max Size update should be greater than initial Max Size defined in Create Message Stream request
            .withMaxSize(536870912L) // Update Max Size to 512 MB.
            .withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB.
            .withFlushOnWrite(true) // Update flush on write to true.
            .withPersistence(Persistence.Memory) // Update the persistence to Memory.
            .withTimeToLiveMillis(3600000L)  // Update TTL to 1 hour.
            .withExportDefinition(
                // Optional. Choose where/how the stream is exported to the AWS Cloud.
                messageStreamInfo.getDefinition().getExportDefinition().
                    // Updating Export definition to add a Kinesis Stream configuration.
                    .withKinesis(new ArrayList<KinesisConfig>() {{
                        add(new KinesisConfig()
                            .withIdentifier(EXPORT_IDENTIFIER)
                            .withKinesisStreamName("test"));
                        }})
            );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [update\$1message\$1stream](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#updateMessageStream-java.lang.String-) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const messageStreamInfo = await c.describeMessageStream(STREAM_NAME);
        await client.updateMessageStream(
            messageStreamInfo.definition
                // Max Size update should be greater than initial Max Size defined in Create Message Stream request
                .withMaxSize(536870912)  // Default is 256 MB. Updating Max Size to 512 MB.
                .withStreamSegmentSize(33554432)  // Default is 16 MB. Updating Segment Size to 32 MB.
                .withTimeToLiveMillis(3600000)  // By default, no TTL is enabled. Update TTL to 1 hour.
                .withStrategyOnFull(StrategyOnFull.RejectNewData)  // Required. Updating Strategy on full to reject new data.
                .withPersistence(Persistence.Memory)  // Default is File. Update the persistence to Memory
                .withFlushOnWrite(true)  // Default is false. Updating to true.
                .withExportDefinition(  
                    // Optional. Choose where/how the stream is exported to the AWS Cloud.
                    messageStreamInfo.definition.exportDefinition
                        // Updating Export definition to add a Kinesis Stream configuration.
                        .withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())])
                )
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [updateMessageStream](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#updateMessageStream) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html)

------

### Constraints for updating streams
<a name="streammanagerclient-update-constraints"></a>

The following constraints apply when updating streams. Unless noted in the following list, updates take effect immediately.
+ You can't update a stream's persistence. To change this behavior, [delete the stream](#streammanagerclient-delete-message-stream) and [create a stream](#streammanagerclient-create-message-stream) that defines the new persistence policy.
+ You can update the maximum size of a stream only under the following conditions:
  + The maximum size must be greater or equal to the current size of the stream. <a name="messagestreaminfo-describe-stream"></a>To find this information, [describe the stream](#streammanagerclient-describe-message-stream) and then check the storage status of the returned `MessageStreamInfo` object.
  + The maximum size must be greater than or equal to the stream's segment size.
+ You can update the stream segment size to a value less than the maximum size of the stream. The updated setting applies to new segments.
+ Updates to the time to live (TTL) property apply to new append operations. If you decrease this value, stream manager might also delete existing segments that exceed the TTL.
+ Updates to the strategy on full property apply to new append operations. If you set the strategy to overwrite the oldest data, stream manager might also overwrite existing segments based on the new setting.
+ Updates to the flush on write property apply to new messages.
+ Updates to export configurations apply to new exports. The update request must include all export configurations that you want to support. Otherwise, stream manager deletes them.
  + When you update an export configuration, specify the identifier of the target export configuration.
  + To add an export configuration, specify a unique identifier for the new export configuration.
  + To delete an export configuration, omit the export configuration.
+ To [update](#streammanagerclient-update-message-stream) the starting sequence number of an export configuration in a stream, you must specify a value that's less than the latest sequence number. <a name="messagestreaminfo-describe-stream"></a>To find this information, [describe the stream](#streammanagerclient-describe-message-stream) and then check the storage status of the returned `MessageStreamInfo` object.

 

## Delete message stream
<a name="streammanagerclient-delete-message-stream"></a>

Deletes a stream. When you delete a stream, all of the stored data for the stream is deleted from the disk.

### Requirements
<a name="streammanagerclient-delete-message-stream-reqs"></a>

This operation has the following requirements:
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core version: 1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>Minimum AWS IoT Greengrass Core SDK version: Python: 1.5.0  \$1  Java: 1.4.0  \$1  Node.js: 1.6.0

### Examples
<a name="streammanagerclient-delete-message-stream-examples"></a>

The following snippet deletes the stream named `StreamName`.

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    client.delete_message_stream(stream_name="StreamName")
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [deleteMessageStream](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.delete_message_stream)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    client.deleteMessageStream("StreamName");
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [delete\$1message\$1stream](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#deleteMessageStream-java.lang.String-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        await client.deleteMessageStream("StreamName");
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [deleteMessageStream](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#deleteMessageStream)

------

## See also
<a name="work-with-streams-see-also"></a>
+ [Manage data streams on the AWS IoT Greengrass core](stream-manager.md)
+ [Configure AWS IoT Greengrass stream manager](configure-stream-manager.md)
+ [Export configurations for supported AWS Cloud destinations](stream-export-configurations.md)
+ [Export data streams to the AWS Cloud (console)](stream-manager-console.md)
+ [Export data streams to the AWS Cloud (CLI)](stream-manager-cli.md)
+ `StreamManagerClient` in the AWS IoT Greengrass Core SDK reference:
  + [Python](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html)
  + [Java](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html)
  + [Node.js](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html)

# Export configurations for supported AWS Cloud destinations
<a name="stream-export-configurations"></a>

User-defined Lambda functions use `StreamManagerClient` in the AWS IoT Greengrass Core SDK to interact with stream manager. When a Lambda function [creates a stream](work-with-streams.md#streammanagerclient-create-message-stream) or [updates a stream](work-with-streams.md#streammanagerclient-create-message-stream), it passes a `MessageStreamDefinition` object that represents stream properties, including the export definition. The `ExportDefinition` object contains the export configurations defined for the stream. Stream manager uses these export configurations to determine where and how to export the stream.

![\[Object model diagram of the ExportDefinition property type.\]](http://docs.aws.amazon.com/greengrass/v1/developerguide/images/stream-manager-exportconfigs.png)


You can define zero or more export configurations on a stream, including multiple export configurations for a single destination type. For example, you can export a stream to two AWS IoT Analytics channels and one Kinesis data stream.

For failed export attempts, stream manager continually retries exporting data to the AWS Cloud at intervals of up to five minutes. The number of retry attempts doesn't have a maximum limit.

**Note**  
<a name="streammanagerclient-http-config"></a>`StreamManagerClient` also provides a target destination you can use to export streams to an HTTP server. This target is intended for testing purposes only. It is not stable or supported for use in production environments.

**Topics**
+ [AWS IoT Analytics channels](#export-to-iot-analytics)
+ [Amazon Kinesis data streams](#export-to-kinesis)
+ [AWS IoT SiteWise asset properties](#export-to-iot-sitewise)
+ [Amazon S3 objects](#export-to-s3)

You are reponsible for maintaining these AWS Cloud resources.

## AWS IoT Analytics channels
<a name="export-to-iot-analytics"></a>

Stream manager supports automatic exports to AWS IoT Analytics. <a name="ita-export-destination"></a>AWS IoT Analytics lets you perform advanced analysis on your data to help make business decisions and improve machine learning models. For more information, see [What is AWS IoT Analytics?](https://docs.aws.amazon.com/iotanalytics/latest/userguide/welcome.html) in the *AWS IoT Analytics User Guide*.

In the AWS IoT Greengrass Core SDK, your Lambda functions use the `IoTAnalyticsConfig` to define the export configuration for this destination type. For more information, see the SDK reference for your target language:
+ [IoTAnalyticsConfig](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.IoTAnalyticsConfig) in the Python SDK
+ [IoTAnalyticsConfig](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/export/IoTAnalyticsConfig.html) in the Java SDK
+ [IoTAnalyticsConfig](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.IoTAnalyticsConfig.html) in the Node.js SDK

### Requirements
<a name="export-to-iot-analytics-reqs"></a>

This export destination has the following requirements:
+ Target channels in AWS IoT Analytics must be in the same AWS account and AWS Region as the Greengrass group.
+ The [Greengrass group role](group-role.md) must allow the `iotanalytics:BatchPutMessage` permission to target channels. For example:

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "iotanalytics:BatchPutMessage"
              ],
              "Resource": [
              "arn:aws:iotanalytics:us-east-1:123456789012:channel/channel_1_name",
      "arn:aws:iotanalytics:us-east-1:123456789012:channel/channel_2_name"
              ]
          }
      ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>You can grant granular or conditional access to resources, for example, by using a wildcard `*` naming scheme. For more information, see [Adding and removing IAM policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html) in the *IAM User Guide*.

### Exporting to AWS IoT Analytics
<a name="export-streams-to-iot-analytics"></a>

To create a stream that exports to AWS IoT Analytics, your Lambda functions [create a stream](work-with-streams.md#streammanagerclient-create-message-stream) with an export definition that includes one or more `IoTAnalyticsConfig` objects. This object defines export settings, such as the target channel, batch size, batch interval, and priority.

When your Lambda functions receive data from devices, they [append messages](work-with-streams.md#streammanagerclient-append-message) that contain a blob of data to the target stream.

Then, stream manager exports the data based on the batch settings and priority defined in the stream's export configurations.

 

## Amazon Kinesis data streams
<a name="export-to-kinesis"></a>

Stream manager supports automatic exports to Amazon Kinesis Data Streams. <a name="aks-export-destination"></a>Kinesis Data Streams is commonly used to aggregate high-volume data and load it into a data warehouse or map-reduce cluster. For more information, see [What is Amazon Kinesis Data Streams?](https://docs.aws.amazon.com/streams/latest/dev/what-is-this-service.html) in the *Amazon Kinesis Developer Guide*.

In the AWS IoT Greengrass Core SDK, your Lambda functions use the `KinesisConfig` to define the export configuration for this destination type. For more information, see the SDK reference for your target language:
+ [KinesisConfig](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.KinesisConfig) in the Python SDK
+ [KinesisConfig](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/export/KinesisConfig.html) in the Java SDK
+ [KinesisConfig](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.KinesisConfig.html) in the Node.js SDK

### Requirements
<a name="export-to-kinesis-reqs"></a>

This export destination has the following requirements:
+ Target streams in Kinesis Data Streams must be in the same AWS account and AWS Region as the Greengrass group.
+ The [Greengrass group role](group-role.md) must allow the `kinesis:PutRecords` permission to target data streams. For example:

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "kinesis:PutRecords"
              ],
              "Resource": [
              "arn:aws:kinesis:us-east-1:123456789012:stream/stream_1_name",
      "arn:aws:kinesis:us-east-1:123456789012:stream/stream_2_name"
              ]
          }
      ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>You can grant granular or conditional access to resources, for example, by using a wildcard `*` naming scheme. For more information, see [Adding and removing IAM policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html) in the *IAM User Guide*.

### Exporting to Kinesis Data Streams
<a name="export-streams-to-kinesis"></a>

To create a stream that exports to Kinesis Data Streams, your Lambda functions [create a stream](work-with-streams.md#streammanagerclient-create-message-stream) with an export definition that includes one or more `KinesisConfig` objects. This object defines export settings, such as the target data stream, batch size, batch interval, and priority.

When your Lambda functions receive data from devices, they [append messages](work-with-streams.md#streammanagerclient-append-message) that contain a blob of data to the target stream. Then, stream manager exports the data based on the batch settings and priority defined in the stream's export configurations.

Stream manager generates a unique, random UUID as a partition key for each record uploaded to Amazon Kinesis. 

 

## AWS IoT SiteWise asset properties
<a name="export-to-iot-sitewise"></a>

Stream manager supports automatic exports to AWS IoT SiteWise. <a name="itsw-export-destination"></a>AWS IoT SiteWise lets you collect, organize, and analyze data from industrial equipment at scale. For more information, see [What is AWS IoT SiteWise?](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/what-is-sitewise.html) in the *AWS IoT SiteWise User Guide*.

In the AWS IoT Greengrass Core SDK, your Lambda functions use the `IoTSiteWiseConfig` to define the export configuration for this destination type. For more information, see the SDK reference for your target language:
+ [IoTSiteWiseConfig](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.IoTSiteWiseConfig) in the Python SDK
+ [IoTSiteWiseConfig](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/export/IoTSiteWiseConfig.html) in the Java SDK
+ [IoTSiteWiseConfig](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.IoTSiteWiseConfig.html) in the Node.js SDK

**Note**  
AWS also provides the [IoT SiteWise connector](iot-sitewise-connector.md), which is a pre-built solution that you can use with OPC-UA sources.

### Requirements
<a name="export-to-iot-sitewise-reqs"></a>

This export destination has the following requirements:
+ Target asset properties in AWS IoT SiteWise must be in the same AWS account and AWS Region as the Greengrass group.
**Note**  
For the list of Regions that AWS IoT SiteWise supports, see [AWS IoT SiteWise endpoints and quotas](https://docs.aws.amazon.com/general/latest/gr/iot-sitewise.html#iot-sitewise_region) in the *AWS General Reference*.
+ The [Greengrass group role](group-role.md) must allow the `iotsitewise:BatchPutAssetPropertyValue` permission to target asset properties. The following example policy uses the `iotsitewise:assetHierarchyPath` condition key to grant access to a target root asset and its children. You can remove the `Condition` from the policy to allow access to all of your AWS IoT SiteWise assets or specify ARNs of individual assets.

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
               "Effect": "Allow",
               "Action": "iotsitewise:BatchPutAssetPropertyValue",
               "Resource": "*",
               "Condition": {
                   "StringLike": {
                       "iotsitewise:assetHierarchyPath": [
                           "/root node asset ID",
                           "/root node asset ID/*"
                       ]
                   }
               }
          }
      ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>You can grant granular or conditional access to resources, for example, by using a wildcard `*` naming scheme. For more information, see [Adding and removing IAM policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html) in the *IAM User Guide*.

  For important security information, see [ BatchPutAssetPropertyValue authorization](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/security_iam_service-with-iam.html#security_iam_service-with-iam-id-based-policies-batchputassetpropertyvalue-action) in the *AWS IoT SiteWise User Guide*.

### Exporting to AWS IoT SiteWise
<a name="export-streams-to-sitewise"></a>

To create a stream that exports to AWS IoT SiteWise, your Lambda functions [create a stream](work-with-streams.md#streammanagerclient-create-message-stream) with an export definition that includes one or more `IoTSiteWiseConfig` objects. This object defines export settings, such as the batch size, batch interval, and priority.

When your Lambda functions receive asset property data from devices, they append messages that contain the data to the target stream. Messages are JSON-serialized `PutAssetPropertyValueEntry` objects that contain property values for one or more asset properties. For more information, see [Append message](work-with-streams.md#streammanagerclient-append-message-sitewise) for AWS IoT SiteWise export destinations.

**Note**  
<a name="BatchPutAssetPropertyValue-data-reqs"></a>When you send data to AWS IoT SiteWise, your data must meet the requirements of the `BatchPutAssetPropertyValue` action. For more information, see [BatchPutAssetPropertyValue](https://docs.aws.amazon.com/iot-sitewise/latest/APIReference/API_BatchPutAssetPropertyValue.html) in the *AWS IoT SiteWise API Reference*.

Then, stream manager exports the data based on the batch settings and priority defined in the stream's export configurations.

 

You can adjust your stream manager settings and Lambda function logic to design your export strategy. For example:
+ For near real time exports, set low batch size and interval settings and append the data to the stream when it's received.
+ To optimize batching, mitigate bandwidth constraints, or minimize cost, your Lambda functions can pool the timestamp-quality-value (TQV) data points received for a single asset property before appending the data to the stream. One strategy is to batch entries for up to 10 different property-asset combinations, or property aliases, in one message instead of sending more than one entry for the same property. This helps stream manager to remain within [AWS IoT SiteWise quotas](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/quotas.html).

 

## Amazon S3 objects
<a name="export-to-s3"></a>

Stream manager supports automatic exports to Amazon S3. <a name="s3-export-destination"></a>You can use Amazon S3 to store and retrieve large amounts of data. For more information, see [What is Amazon S3?](https://docs.aws.amazon.com/AmazonS3/latest/dev/Welcome.html) in the *Amazon Simple Storage Service Developer Guide*.

In the AWS IoT Greengrass Core SDK, your Lambda functions use the `S3ExportTaskExecutorConfig` to define the export configuration for this destination type. For more information, see the SDK reference for your target language:
+ [S3ExportTaskExecutorConfig](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.S3ExportTaskExecutorConfig) in the Python SDK
+ [S3ExportTaskExecutorConfig](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/export/S3ExportTaskExecutorConfig.html) in the Java SDK
+ [S3ExportTaskExecutorConfig](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.S3ExportTaskExecutorConfig.html) in the Node.js SDK

### Requirements
<a name="export-to-s3-reqs"></a>

This export destination has the following requirements:
+ Target Amazon S3 buckets must be in the same AWS account as the Greengrass group.
+ If the [default containerization](lambda-group-config.md#lambda-containerization-groupsettings) for the Greengrass group is **Greengrass container**, you must set the [STREAM\$1MANAGER\$1READ\$1ONLY\$1DIRS](configure-stream-manager.md#stream-manager-read-only-directories) parameter to use an input file directory that's under `/tmp` or isn't on the root file system.
+ If a Lambda function running in **Greengrass container** mode writes input files to the input file directory, you must create a local volume resource for the directory and mount the directory to the container with write permissions. This ensures that the files are written to the root file system and visible outside the container. For more information, see [Access local resources with Lambda functions and connectors](access-local-resources.md).
+ The [Greengrass group role](group-role.md) must allow the following permissions to the target buckets. For example:

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "s3:PutObject",
                  "s3:AbortMultipartUpload",
                  "s3:ListMultipartUploadParts"
              ],
              "Resource": [
                  "arn:aws:s3:::bucket-1-name/*",
                  "arn:aws:s3:::bucket-2-name/*"
              ]
          }
      ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>You can grant granular or conditional access to resources, for example, by using a wildcard `*` naming scheme. For more information, see [Adding and removing IAM policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html) in the *IAM User Guide*.

### Exporting to Amazon S3
<a name="export-streams-to-s3"></a>

To create a stream that exports to Amazon S3, your Lambda functions use the `S3ExportTaskExecutorConfig` object to configure the export policy. The policy defines export settings, such as the multipart upload threshold and priority. For Amazon S3 exports, stream manager uploads data that it reads from local files on the core device. To initiate an upload, your Lambda functions append an export task to the target stream. The export task contains information about the input file and target Amazon S3 object. Stream manager executes tasks in the sequence that they are appended to the stream.

**Note**  
<a name="bucket-not-key-must-exist"></a>The target bucket must already exist in your AWS account. If an object for the specified key doesn't exist, stream manager creates the object for you.

 This high-level workflow is shown in the following diagram.

![\[Diagram of the stream manager workflow for Amazon S3 exports.\]](http://docs.aws.amazon.com/greengrass/v1/developerguide/images/stream-manager-s3.png)


Stream manager uses the multipart upload threshold property, [minimum part size](configure-stream-manager.md#stream-manager-minimum-part-size) setting, and size of the input file to determine how to upload data. The multipart upload threshold must be greater or equal to the minimum part size. If you want to upload data in parallel, you can create multiple streams.

The keys that specify your target Amazon S3 objects can include valid [Java DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html) strings in `!{timestamp:value}` placeholders. You can use these timestamp placeholders to partition data in Amazon S3 based on the time that the input file data was uploaded. For example, the following key name resolves to a value such as `my-key/2020/12/31/data.txt`.

```
my-key/!{timestamp:YYYY}/!{timestamp:MM}/!{timestamp:dd}/data.txt
```

**Note**  
If you want to monitor the export status for a stream, first create a status stream and then configure the export stream to use it. For more information, see [Monitor export tasks](#monitor-export-status-s3).

#### Manage input data
<a name="manage-s3-input-data"></a>

You can author code that IoT applications use to manage the lifecycle of the input data. The following example workflow shows how you might use Lambda functions to manage this data.

1. A local process receives data from devices or peripherals, and then writes the data to files in a directory on the core device. These are the input files for stream manager.
**Note**  
To determine if you must configure access to the input file directory, see the [STREAM\$1MANAGER\$1READ\$1ONLY\$1DIRS](configure-stream-manager.md#stream-manager-read-only-directories) parameter.  
The process that stream manager runs in inherits all of the file system permissions of the [default access identity](lambda-group-config.md#lambda-access-identity-groupsettings) for the group. Stream manager must have permission to access the input files. You can use the `chmod(1)` command to change the permission of the files, if necessary.

1. A Lambda function scans the directory and [appends an export task](work-with-streams.md#streammanagerclient-append-message-export-task) to the target stream when a new file is created. The task is a JSON-serialized `S3ExportTaskDefinition` object that specifies the URL of the input file, the target Amazon S3 bucket and key, and optional user metadata.

1. Stream manager reads the input file and exports the data to Amazon S3 in the order of appended tasks. <a name="bucket-not-key-must-exist"></a>The target bucket must already exist in your AWS account. If an object for the specified key doesn't exist, stream manager creates the object for you.

1. The Lambda function [reads messages](work-with-streams.md#streammanagerclient-read-messages) from a status stream to monitor the export status. After export tasks are completed, the Lambda function can delete the corresponding input files. For more information, see [Monitor export tasks](#monitor-export-status-s3).

### Monitor export tasks
<a name="monitor-export-status-s3"></a>

You can author code that IoT applications use to monitor the status of your Amazon S3 exports. Your Lambda functions must create a status stream and then configure the export stream to write status updates to the status stream. A single status stream can receive status updates from multiple streams that export to Amazon S3.

First, [create a stream](work-with-streams.md#streammanagerclient-create-message-stream) to use as the status stream. You can configure the size and retention policies for the stream to control the lifespan of the status messages. For example:
+ Set `Persistence` to `Memory` if you don't want to store the status messages.
+ Set `StrategyOnFull` to `OverwriteOldestData` so that new status messages are not lost.

Then, create or update the export stream to use the status stream. Specifically, set the status configuration property of the stream’s `S3ExportTaskExecutorConfig` export configuration. This tells stream manager to write status messages about the export tasks to the status stream. In the `StatusConfig` object, specify the name of the status stream and the level of verbosity. The following supported values range from least verbose (`ERROR`) to most verbose (`TRACE`). The default is `INFO`.
+ `ERROR`
+ `WARN`
+ `INFO`
+ `DEBUG`
+ `TRACE`

 

The following example workflow shows how Lambda functions might use a status stream to monitor export status.

1. As described in the previous workflow, a Lambda function [appends an export task](work-with-streams.md#streammanagerclient-append-message-export-task) to a stream that's configured to write status messages about export tasks to a status stream. The append operation return a sequence number that represents the task ID.

1. A Lambda function [reads messages](work-with-streams.md#streammanagerclient-read-messages) sequentially from the status stream, and then filters the messages based on the stream name and task ID or based on an export task property from the message context. For example, the Lambda function can filter by the input file URL of the export task, which is represented by the `S3ExportTaskDefinition` object in the message context.

   The following status codes indicate that an export task has reached a completed state:
   + `Success`. The upload was completed successfully.
   + `Failure`. Stream manager encountered an error, for example, the specified bucket does not exist. After resolving the issue, you can append the export task to the stream again.
   + `Canceled`. The task was aborted because the stream or export definition was deleted, or the time-to-live (TTL) period of the task expired.
**Note**  
The task might also have a status of `InProgress` or `Warning`. Stream manager issues warnings when an event returns an error that doesn't affect the execution of the task. For example, a failure to clean up an aborted partial upload returns a warning.

1. After export tasks are completed, the Lambda function can delete the corresponding input files.

The following example shows how a Lambda function might read and process status messages.

------
#### [ Python ]

```
import time
from greengrasssdk.stream_manager import (
    ReadMessagesOptions,
    Status,
    StatusConfig,
    StatusLevel,
    StatusMessage,
    StreamManagerClient,
)
from greengrasssdk.stream_manager.util import Util

client = StreamManagerClient()
 
try:
    # Read the statuses from the export status stream
    is_file_uploaded_to_s3 = False
    while not is_file_uploaded_to_s3:
        try:
            messages_list = client.read_messages(
                "StatusStreamName", ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000)
            )
            for message in messages_list:
                # Deserialize the status message first.
                status_message = Util.deserialize_json_bytes_to_obj(message.payload, StatusMessage)

                # Check the status of the status message. If the status is "Success",
                # the file was successfully uploaded to S3.
                # If the status was either "Failure" or "Cancelled", the server was unable to upload the file to S3.
                # We will print the message for why the upload to S3 failed from the status message.
                # If the status was "InProgress", the status indicates that the server has started uploading
                # the S3 task.
                if status_message.status == Status.Success:
                    logger.info("Successfully uploaded file at path " + file_url + " to S3.")
                    is_file_uploaded_to_s3 = True
                elif status_message.status == Status.Failure or status_message.status == Status.Canceled:
                    logger.info(
                        "Unable to upload file at path " + file_url + " to S3. Message: " + status_message.message
                    )
                    is_file_uploaded_to_s3 = True
            time.sleep(5)
        except StreamManagerException:
            logger.exception("Exception while running")
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK reference: [read\$1messages](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.read_messages) \$1 [StatusMessage](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.StatusMessage)

------
#### [ Java ]

```
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient;
import com.amazonaws.greengrass.streammanager.client.utils.ValidateAndSerialize;
import com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions;
import com.amazonaws.greengrass.streammanager.model.Status;
import com.amazonaws.greengrass.streammanager.model.StatusConfig;
import com.amazonaws.greengrass.streammanager.model.StatusLevel;
import com.amazonaws.greengrass.streammanager.model.StatusMessage;

try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    try {
        boolean isS3UploadComplete = false;
        while (!isS3UploadComplete) {
            try {
                // Read the statuses from the export status stream
                List<Message> messages = client.readMessages("StatusStreamName",
                    new ReadMessagesOptions().withMinMessageCount(1L).withReadTimeoutMillis(1000L));
                for (Message message : messages) {
                    // Deserialize the status message first.
                    StatusMessage statusMessage = ValidateAndSerialize.deserializeJsonBytesToObj(message.getPayload(), StatusMessage.class);
                    // Check the status of the status message. If the status is "Success", the file was successfully uploaded to S3.
                    // If the status was either "Failure" or "Canceled", the server was unable to upload the file to S3.
                    // We will print the message for why the upload to S3 failed from the status message.
                    // If the status was "InProgress", the status indicates that the server has started uploading the S3 task.
                    if (Status.Success.equals(statusMessage.getStatus())) {
                        System.out.println("Successfully uploaded file at path " + FILE_URL + " to S3.");
                        isS3UploadComplete = true;
                     } else if (Status.Failure.equals(statusMessage.getStatus()) || Status.Canceled.equals(statusMessage.getStatus())) {
                        System.out.println(String.format("Unable to upload file at path %s to S3. Message %s",
                            statusMessage.getStatusContext().getS3ExportTaskDefinition().getInputUrl(),
                            statusMessage.getMessage()));
                        sS3UploadComplete = true;
                    }
                }
            } catch (StreamManagerException ignored) {
            } finally {
                // Sleep for sometime for the S3 upload task to complete before trying to read the status message.
                Thread.sleep(5000);
            }
        } catch (e) {
        // Properly handle errors.
    }
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK reference: [readMessages](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#readMessages-java.lang.String-com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions-) \$1 [StatusMessage](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/StatusMessage.html)

------
#### [ Node.js ]

```
const {
    StreamManagerClient, ReadMessagesOptions,
    Status, StatusConfig, StatusLevel, StatusMessage,
    util,
} = require('aws-greengrass-core-sdk').StreamManager;

const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        let isS3UploadComplete = false;
        while (!isS3UploadComplete) {
            try {
                // Read the statuses from the export status stream
                const messages = await c.readMessages("StatusStreamName",
                    new ReadMessagesOptions()
                        .withMinMessageCount(1)
                        .withReadTimeoutMillis(1000));

                messages.forEach((message) => {
                    // Deserialize the status message first.
                    const statusMessage = util.deserializeJsonBytesToObj(message.payload, StatusMessage);
                    // Check the status of the status message. If the status is 'Success', the file was successfully uploaded to S3.
                    // If the status was either 'Failure' or 'Cancelled', the server was unable to upload the file to S3.
                    // We will print the message for why the upload to S3 failed from the status message.
                    // If the status was "InProgress", the status indicates that the server has started uploading the S3 task.
                    if (statusMessage.status === Status.Success) {
                        console.log(`Successfully uploaded file at path ${FILE_URL} to S3.`);
                        isS3UploadComplete = true;
                    } else if (statusMessage.status === Status.Failure || statusMessage.status === Status.Canceled) {
                        console.log(`Unable to upload file at path ${FILE_URL} to S3. Message: ${statusMessage.message}`);
                        isS3UploadComplete = true;
                    }
                });
                // Sleep for sometime for the S3 upload task to complete before trying to read the status message.
                await new Promise((r) => setTimeout(r, 5000));
            } catch (e) {
                // Ignored
            }
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK reference: [readMessages](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#readMessages) \$1 [StatusMessage](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StatusMessage.html)

------