AWS IoT Greengrass Version 1 entered the extended life phase on June 30, 2023. For more information, see the AWS IoT Greengrass V1 maintenance policy. After this date, AWS IoT Greengrass V1 won't release updates that provide features, enhancements, bug fixes, or security patches. Devices that run on AWS IoT Greengrass V1 won't be disrupted and will continue to operate and to connect to the cloud. We strongly recommend that you migrate to AWS IoT Greengrass Version 2, which adds significant new features and support for additional platforms.
Use StreamManagerClient to work with streams
User-defined Lambda functions running on the AWS IoT Greengrass core can use the
StreamManagerClient
object in the AWS IoT Greengrass Core SDK to create streams in stream
manager and then interact with the streams. When a Lambda function creates a
stream, it defines the AWS Cloud destinations, prioritization, and other export and data
retention policies for the stream. To send data to stream manager, Lambda functions append
the data to the stream. If an export destination is defined for the stream, stream manager
exports the stream automatically.
Note
Typically, clients of stream manager are user-defined Lambda functions. If your business case requires it, you can also allow non-Lambda processes running on the Greengrass core (for example, a Docker container) to interact with stream manager. For more information, see Client authentication.
The snippets in this topic show you how clients call StreamManagerClient
methods to work with streams. For implementation details about the methods and their
arguments, use the links to the SDK reference listed after each snippet. For tutorials that
include a complete Python Lambda function, see Export data streams to the AWS Cloud
(console) or
Export data streams to the AWS Cloud (CLI).
Your Lambda function should instantiate StreamManagerClient
outside of the
function handler. If instantiated in the handler, the function creates a client
and connection to stream manager every time that it's invoked.
Note
If you do instantiate StreamManagerClient
in the handler, you must
explicitly call the close()
method when the client
completes
its work. Otherwise, the client
keeps the connection open and another
thread running until the script exits.
StreamManagerClient
supports the following operations:
Create message
stream
To create a stream, a user-defined Lambda function calls the create method and passes
in a MessageStreamDefinition
object. This object specifies the unique name
for the stream and defines how stream manager should handle new data when the maximum
stream size is reached. You can use MessageStreamDefinition
and its data
types (such as ExportDefinition
, StrategyOnFull
, and
Persistence
) to define other stream properties. These include:
-
The target AWS IoT Analytics, Kinesis Data Streams, AWS IoT SiteWise, and Amazon S3 destinations for automatic exports. For more information, see Export configurations for supported AWS Cloud destinations.
-
Export priority. Stream manager exports higher priority streams before lower priority streams.
-
Maximum batch size and batch interval for AWS IoT Analytics, Kinesis Data Streams, and AWS IoT SiteWise destinations. Stream manager exports messages when either condition is met.
-
Time-to-live (TTL). The amount of time to guarantee that the stream data is available for processing. You should make sure that the data can be consumed within this time period. This is not a deletion policy. The data might not be deleted immediately after TTL period.
-
Stream persistence. Choose to save streams to the file system to persist data across core restarts or save streams in memory.
-
Starting sequence number. Specify the sequence number of the message to use as the starting message in the export.
For more information about MessageStreamDefinition
, see the SDK reference
for your target language:
-
MessageStreamDefinition
in the Java SDK -
MessageStreamDefinition
in the Node.js SDK -
MessageStreamDefinition
in the Python SDK
Note
StreamManagerClient
also provides a target destination you can use to export streams to an HTTP server.
This target is intended for testing purposes only. It is not stable or supported for use in production environments.
After a stream is created, your Lambda functions can append messages to the stream to send data for export and read messages from the stream for local processing. The number of streams that you create depends on your hardware capabilities and business case. One strategy is to create a stream for each target channel in AWS IoT Analytics or Kinesis data stream, though you can define multiple targets for a stream. A stream has a durable lifespan.
Requirements
This operation has the following requirements:
-
Minimum AWS IoT Greengrass Core version: 1.10.0
-
Minimum AWS IoT Greengrass Core SDK version: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0
Note
Creating streams with an AWS IoT SiteWise or Amazon S3 export destination has the following requirements:
-
Minimum AWS IoT Greengrass Core version: 1.11.0
-
Minimum AWS IoT Greengrass Core SDK version: Python: 1.6.0 | Java: 1.5.0 | Node.js: 1.7.0
Examples
The following snippet creates a stream named StreamName
. It defines
stream properties in the MessageStreamDefinition
and subordinate data
types.
client = StreamManagerClient()
try:
client.create_message_stream(MessageStreamDefinition(
name="StreamName", # Required.
max_size=268435456, # Default is 256 MB.
stream_segment_size=16777216, # Default is 16 MB.
time_to_live_millis=None, # By default, no TTL is enabled.
strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required.
persistence=Persistence.File, # Default is File.
flush_on_write=False, # Default is false.
export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the AWS Cloud.
kinesis=None,
iot_analytics=None,
iot_sitewise=None,
s3_task_executor=None
)
))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK reference: create_message_stream
For more information about configuring export destinations, see Export configurations for supported AWS Cloud destinations.
Append message
To send data to stream manager for export, your Lambda functions append the data to the target stream. The export destination determines the data type to pass to this method.
Requirements
This operation has the following requirements:
-
Minimum AWS IoT Greengrass Core version: 1.10.0
-
Minimum AWS IoT Greengrass Core SDK version: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0
Note
Appending messages with an AWS IoT SiteWise or Amazon S3 export destination has the following requirements:
-
Minimum AWS IoT Greengrass Core version: 1.11.0
-
Minimum AWS IoT Greengrass Core SDK version: Python: 1.6.0 | Java: 1.5.0 | Node.js: 1.7.0
Examples
AWS IoT Analytics or Kinesis Data Streams export destinations
The following snippet appends a message to the stream named
StreamName
. For AWS IoT Analytics or Kinesis Data Streams destinations, your Lambda
functions append a blob of data.
This snippet has the following requirements:
-
Minimum AWS IoT Greengrass Core version: 1.10.0
-
Minimum AWS IoT Greengrass Core SDK version: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0
client = StreamManagerClient()
try:
sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data')
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK reference: append_message
AWS IoT SiteWise export destinations
The following snippet appends a message to the stream named
StreamName
. For AWS IoT SiteWise destinations, your Lambda functions
append a serialized PutAssetPropertyValueEntry
object. For more
information, see Exporting to AWS IoT SiteWise.
Note
When you send data to AWS IoT SiteWise, your data must meet the requirements of the BatchPutAssetPropertyValue
action.
For more information, see BatchPutAssetPropertyValue
in the AWS IoT SiteWise API Reference.
This snippet has the following requirements:
-
Minimum AWS IoT Greengrass Core version: 1.11.0
-
Minimum AWS IoT Greengrass Core SDK version: Python: 1.6.0 | Java: 1.5.0 | Node.js: 1.7.0
client = StreamManagerClient()
try:
# SiteWise requires unique timestamps in all messages. Add some randomness to time and offset.
# Note: To create a new asset property data, you should use the classes defined in the
# greengrasssdk.stream_manager module.
time_in_nanos = TimeInNanos(
time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
)
variant = Variant(double_value=random.random())
asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset)
sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK reference: append_message
Amazon S3 export destinations
The following snippet appends an export task to the stream named
StreamName
. For Amazon S3 destinations, your Lambda functions append
a serialized S3ExportTaskDefinition
object that contains
information about the source input file and target Amazon S3 object. If the specified
object doesn't exist, Stream Manager creates it for you. For more information, see Exporting to Amazon S3.
This snippet has the following requirements:
-
Minimum AWS IoT Greengrass Core version: 1.11.0
-
Minimum AWS IoT Greengrass Core SDK version: Python: 1.6.0 | Java: 1.5.0 | Node.js: 1.7.0
client = StreamManagerClient()
try:
# Append an Amazon S3 Task definition and print the sequence number.
s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName")
sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK reference: append_message
Read messages
Read messages from a stream.
Requirements
This operation has the following requirements:
-
Minimum AWS IoT Greengrass Core version: 1.10.0
-
Minimum AWS IoT Greengrass Core SDK version: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0
Examples
The following snippet reads messages from the stream named
StreamName
. The read method takes an optional
ReadMessagesOptions
object that specifies the sequence number to
start reading from, the minimum and maximum numbers to read, and a timeout for
reading messages.
client = StreamManagerClient()
try:
message_list = client.read_messages(
stream_name="StreamName",
# By default, if no options are specified, it tries to read one message from the beginning of the stream.
options=ReadMessagesOptions(
desired_start_sequence_number=100,
# Try to read from sequence number 100 or greater. By default, this is 0.
min_message_count=10,
# Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
max_message_count=100, # Accept up to 100 messages. By default this is 1.
read_timeout_millis=5000
# Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
)
)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK reference: read_messages
List streams
Get the list of streams in stream manager.
Requirements
This operation has the following requirements:
-
Minimum AWS IoT Greengrass Core version: 1.10.0
-
Minimum AWS IoT Greengrass Core SDK version: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0
Examples
The following snippet gets a list of the streams (by name) in stream manager.
client = StreamManagerClient()
try:
stream_names = client.list_streams()
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK reference: list_streams
Describe message
stream
Get metadata about a stream, including the stream definition, size, and export status.
Requirements
This operation has the following requirements:
-
Minimum AWS IoT Greengrass Core version: 1.10.0
-
Minimum AWS IoT Greengrass Core SDK version: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0
Examples
The following snippet gets metadata about the stream named
StreamName
, including the stream's definition, size, and exporter
statuses.
client = StreamManagerClient()
try:
stream_description = client.describe_message_stream(stream_name="StreamName")
if stream_description.export_statuses[0].error_message:
# The last export of export destination 0 failed with some error
# Here is the last sequence number that was successfully exported
stream_description.export_statuses[0].last_exported_sequence_number
if (stream_description.storage_status.newest_sequence_number >
stream_description.export_statuses[0].last_exported_sequence_number):
pass
# The end of the stream is ahead of the last exported sequence number
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK reference: describe_message_stream
Update message
stream
Update properties of an existing stream. You might want to update a stream if your requirements change after the stream was created. For example:
-
Add a new export configuration for an AWS Cloud destination.
-
Increase the maximum size of a stream to change how data is exported or retained. For example, the stream size in combination with your strategy on full settings might result in data being deleted or rejected before stream manager can process it.
-
Pause and resume exports; for example, if export tasks are long running and you want to ration your upload data.
Your Lambda functions follow this high-level process to update a stream:
-
Update the target properties on the corresponding
MessageStreamDefinition
and subordinate objects. -
Pass in the updated
MessageStreamDefinition
. Make sure to include the complete object definitions for the updated stream. Undefined properties revert to the default values.You can specify the sequence number of the message to use as the starting message in the export.
Requirements
This operation has the following requirements:
-
Minimum AWS IoT Greengrass Core version: 1.11.0
-
Minimum AWS IoT Greengrass Core SDK version: Python: 1.6.0 | Java: 1.5.0 | Node.js: 1.7.0
Examples
The following snippet updates the stream named StreamName
. It updates
multiple properties of a stream that exports to Kinesis Data Streams.
client = StreamManagerClient()
try:
message_stream_info = client.describe_message_stream(STREAM_NAME)
message_stream_info.definition.max_size=536870912
message_stream_info.definition.stream_segment_size=33554432
message_stream_info.definition.time_to_live_millis=3600000
message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData
message_stream_info.definition.persistence=Persistence.Memory
message_stream_info.definition.flush_on_write=False
message_stream_info.definition.export_definition.kinesis=
[KinesisConfig(
# Updating Export definition to add a Kinesis Stream configuration.
identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))]
client.update_message_stream(message_stream_info.definition)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK reference: updateMessageStream
Constraints for updating streams
The following constraints apply when updating streams. Unless noted in the following list, updates take effect immediately.
-
You can't update a stream's persistence. To change this behavior, delete the stream and create a stream that defines the new persistence policy.
-
You can update the maximum size of a stream only under the following conditions:
-
The maximum size must be greater or equal to the current size of the stream. To find this information, describe the stream and then check the storage status of the returned
MessageStreamInfo
object. -
The maximum size must be greater than or equal to the stream's segment size.
-
-
You can update the stream segment size to a value less than the maximum size of the stream. The updated setting applies to new segments.
-
Updates to the time to live (TTL) property apply to new append operations. If you decrease this value, stream manager might also delete existing segments that exceed the TTL.
-
Updates to the strategy on full property apply to new append operations. If you set the strategy to overwrite the oldest data, stream manager might also overwrite existing segments based on the new setting.
-
Updates to the flush on write property apply to new messages.
-
Updates to export configurations apply to new exports. The update request must include all export configurations that you want to support. Otherwise, stream manager deletes them.
-
When you update an export configuration, specify the identifier of the target export configuration.
-
To add an export configuration, specify a unique identifier for the new export configuration.
-
To delete an export configuration, omit the export configuration.
-
-
To update the starting sequence number of an export configuration in a stream, you must specify a value that's less than the latest sequence number. To find this information, describe the stream and then check the storage status of the returned
MessageStreamInfo
object.
Delete message
stream
Deletes a stream. When you delete a stream, all of the stored data for the stream is deleted from the disk.
Requirements
This operation has the following requirements:
-
Minimum AWS IoT Greengrass Core version: 1.10.0
-
Minimum AWS IoT Greengrass Core SDK version: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0
Examples
The following snippet deletes the stream named StreamName
.
client = StreamManagerClient()
try:
client.delete_message_stream(stream_name="StreamName")
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK reference: deleteMessageStream
See also
-
StreamManagerClient
in the AWS IoT Greengrass Core SDK reference: