AWS IoT Greengrass Version 1 entered the extended life phase on June 30, 2023. For
more information, see the AWS IoT Greengrass V1 maintenance
policy. After this date, AWS IoT Greengrass V1 won't release updates that provide features,
enhancements, bug fixes, or security patches. Devices that run on AWS IoT Greengrass V1 won't be
disrupted and will continue to operate and to connect to the cloud. We strongly
recommend that you migrate to AWS IoT Greengrass Version 2,
which adds significant new
features and support for additional platforms.
Use StreamManagerClient to work with streams
User-defined Lambda functions running on the AWS IoT Greengrass core can use the
StreamManagerClient
object in the AWS IoT Greengrass Core SDK to create streams in stream
manager and then interact with the streams. When a Lambda function creates a
stream, it defines the AWS Cloud destinations, prioritization, and other export and data
retention policies for the stream. To send data to stream manager, Lambda functions append
the data to the stream. If an export destination is defined for the stream, stream manager
exports the stream automatically.
Typically, clients of stream manager are user-defined Lambda functions. If your business case requires it, you can also allow non-Lambda processes running on the Greengrass
core (for example, a Docker container) to interact with stream manager.
For more information, see Client authentication.
The snippets in this topic show you how clients call StreamManagerClient
methods to work with streams. For implementation details about the methods and their
arguments, use the links to the SDK reference listed after each snippet. For tutorials that
include a complete Python Lambda function, see Export data streams to the AWS Cloud
(console) or
Export data streams to the AWS Cloud (CLI).
Your Lambda function should instantiate StreamManagerClient
outside of the
function handler. If instantiated in the handler, the function creates a client
and connection to stream manager every time that it's invoked.
If you do instantiate StreamManagerClient
in the handler, you must
explicitly call the close()
method when the client
completes
its work. Otherwise, the client
keeps the connection open and another
thread running until the script exits.
StreamManagerClient
supports the following operations:
Create message
stream
To create a stream, a user-defined Lambda function calls the create method and passes
in a MessageStreamDefinition
object. This object specifies the unique name
for the stream and defines how stream manager should handle new data when the maximum
stream size is reached. You can use MessageStreamDefinition
and its data
types (such as ExportDefinition
, StrategyOnFull
, and
Persistence
) to define other stream properties. These include:
-
The target AWS IoT Analytics, Kinesis Data Streams, AWS IoT SiteWise, and Amazon S3 destinations for automatic exports.
For more information, see Export configurations for supported
AWS Cloud destinations.
-
Export priority. Stream manager exports higher priority streams before lower
priority streams.
-
Maximum batch size and batch interval for AWS IoT Analytics, Kinesis Data Streams, and AWS IoT SiteWise
destinations. Stream manager exports messages when either condition is
met.
-
Time-to-live (TTL). The amount of time to guarantee that the stream data is
available for processing. You should make sure that the data can be consumed
within this time period. This is not a deletion policy. The data might not be
deleted immediately after TTL period.
-
Stream persistence. Choose to save streams to the file system to persist data
across core restarts or save streams in memory.
-
Starting sequence number. Specify the sequence number of the message to use as
the starting message in the export.
For more information about MessageStreamDefinition
, see the SDK reference
for your target language:
StreamManagerClient
also provides a target destination you can use to export streams to an HTTP server.
This target is intended for testing purposes only. It is not stable or supported for use in production environments.
After a stream is created, your Lambda functions can append messages to the stream to
send data for export and read
messages from the stream for local processing. The number of streams that you
create depends on your hardware capabilities and business case. One strategy is to
create a stream for each target channel in AWS IoT Analytics or Kinesis data stream, though you can
define multiple targets for a stream. A stream has a durable lifespan.
Requirements
This operation has the following requirements:
Creating streams with an AWS IoT SiteWise or Amazon S3 export destination has the following
requirements:
Examples
The following snippet creates a stream named StreamName
. It defines
stream properties in the MessageStreamDefinition
and subordinate data
types.
- Python
-
client = StreamManagerClient()
try:
client.create_message_stream(MessageStreamDefinition(
name="StreamName", # Required.
max_size=268435456, # Default is 256 MB.
stream_segment_size=16777216, # Default is 16 MB.
time_to_live_millis=None, # By default, no TTL is enabled.
strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required.
persistence=Persistence.File, # Default is File.
flush_on_write=False, # Default is false.
export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the AWS Cloud.
kinesis=None,
iot_analytics=None,
iot_sitewise=None,
s3_task_executor=None
)
))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK reference: create_message_stream | MessageStreamDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
client.createMessageStream(
new MessageStreamDefinition()
.withName("StreamName") // Required.
.withMaxSize(268435456L) // Default is 256 MB.
.withStreamSegmentSize(16777216L) // Default is 16 MB.
.withTimeToLiveMillis(null) // By default, no TTL is enabled.
.withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required.
.withPersistence(Persistence.File) // Default is File.
.withFlushOnWrite(false) // Default is false.
.withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS Cloud.
new ExportDefinition()
.withKinesis(null)
.withIotAnalytics(null)
.withIotSitewise(null)
.withS3TaskExecutor(null)
)
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK reference: createMessageStream | MessageStreamDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
await client.createMessageStream(
new MessageStreamDefinition()
.withName("StreamName") // Required.
.withMaxSize(268435456) // Default is 256 MB.
.withStreamSegmentSize(16777216) // Default is 16 MB.
.withTimeToLiveMillis(null) // By default, no TTL is enabled.
.withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required.
.withPersistence(Persistence.File) // Default is File.
.withFlushOnWrite(false) // Default is false.
.withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS Cloud.
new ExportDefinition()
.withKinesis(null)
.withIotAnalytics(null)
.withIotSitewise(null)
.withS3TaskExecutor(null)
)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK reference: createMessageStream | MessageStreamDefinition
For more information about configuring export destinations, see Export configurations for supported
AWS Cloud destinations.
Append message
To send data to stream manager for export, your Lambda functions append the data to the
target stream. The export destination determines the data type to pass to this
method.
Requirements
This operation has the following requirements:
Appending messages with an AWS IoT SiteWise or Amazon S3 export destination has the following
requirements:
Examples
AWS IoT Analytics or Kinesis Data Streams export
destinations
The following snippet appends a message to the stream named
StreamName
. For AWS IoT Analytics or Kinesis Data Streams destinations, your Lambda
functions append a blob of data.
This snippet has the following requirements:
- Python
-
client = StreamManagerClient()
try:
sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data')
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK reference: append_message
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes());
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK reference: appendMessage
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array"));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK reference: appendMessage
AWS IoT SiteWise export
destinations
The following snippet appends a message to the stream named
StreamName
. For AWS IoT SiteWise destinations, your Lambda functions
append a serialized PutAssetPropertyValueEntry
object. For more
information, see Exporting to AWS IoT SiteWise.
When you send data to AWS IoT SiteWise, your data must meet the requirements of the BatchPutAssetPropertyValue
action.
For more information, see BatchPutAssetPropertyValue
in the AWS IoT SiteWise API Reference.
This snippet has the following requirements:
- Python
-
client = StreamManagerClient()
try:
# SiteWise requires unique timestamps in all messages. Add some randomness to time and offset.
# Note: To create a new asset property data, you should use the classes defined in the
# greengrasssdk.stream_manager module.
time_in_nanos = TimeInNanos(
time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
)
variant = Variant(double_value=random.random())
asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset)
sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK reference: append_message | PutAssetPropertyValueEntry
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
Random rand = new Random();
// Note: To create a new asset property data, you should use the classes defined in the
// com.amazonaws.greengrass.streammanager.model.sitewise package.
List<AssetPropertyValue> entries = new ArrayList<>() ;
// IoTSiteWise requires unique timestamps in all messages. Add some randomness to time and offset.
final int maxTimeRandomness = 60;
final int maxOffsetRandomness = 10000;
double randomValue = rand.nextDouble();
TimeInNanos timestamp = new TimeInNanos()
.withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness))
.withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness)));
AssetPropertyValue entry = new AssetPropertyValue()
.withValue(new Variant().withDoubleValue(randomValue))
.withQuality(Quality.GOOD)
.withTimestamp(timestamp);
entries.add(entry);
PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
.withEntryId(UUID.randomUUID().toString())
.withPropertyAlias("PropertyAlias")
.withPropertyValues(entries);
long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK reference: appendMessage | PutAssetPropertyValueEntry
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const maxTimeRandomness = 60;
const maxOffsetRandomness = 10000;
const randomValue = Math.random();
// Note: To create a new asset property data, you should use the classes defined in the
// aws-greengrass-core-sdk StreamManager module.
const timestamp = new TimeInNanos()
.withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness))
.withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness));
const entry = new AssetPropertyValue()
.withValue(new Variant().withDoubleValue(randomValue))
.withQuality(Quality.GOOD)
.withTimestamp(timestamp);
const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
.withEntryId(`${ENTRY_ID_PREFIX}${i}`)
.withPropertyAlias("PropertyAlias")
.withPropertyValues([entry]);
const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK reference: appendMessage | PutAssetPropertyValueEntry
Amazon S3 export
destinations
The following snippet appends an export task to the stream named
StreamName
. For Amazon S3 destinations, your Lambda functions append
a serialized S3ExportTaskDefinition
object that contains
information about the source input file and target Amazon S3 object. If the specified
object doesn't exist, Stream Manager creates it for you. For more information, see Exporting to Amazon S3.
This snippet has the following requirements:
- Python
-
client = StreamManagerClient()
try:
# Append an Amazon S3 Task definition and print the sequence number.
s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName")
sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK reference: append_message | S3ExportTaskDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
// Append an Amazon S3 export task definition and print the sequence number.
S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition()
.withBucket("BucketName")
.withKey("KeyName")
.withInputUrl("URLToFile");
long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition));
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK reference: appendMessage | S3ExportTaskDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
// Append an Amazon S3 export task definition and print the sequence number.
const taskDefinition = new S3ExportTaskDefinition()
.withBucket("BucketName")
.withKey("KeyName")
.withInputUrl("URLToFile");
const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition)));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK reference: appendMessage | S3ExportTaskDefinition
Read messages
Read messages from a stream.
Requirements
This operation has the following requirements:
Examples
The following snippet reads messages from the stream named
StreamName
. The read method takes an optional
ReadMessagesOptions
object that specifies the sequence number to
start reading from, the minimum and maximum numbers to read, and a timeout for
reading messages.
- Python
-
client = StreamManagerClient()
try:
message_list = client.read_messages(
stream_name="StreamName",
# By default, if no options are specified, it tries to read one message from the beginning of the stream.
options=ReadMessagesOptions(
desired_start_sequence_number=100,
# Try to read from sequence number 100 or greater. By default, this is 0.
min_message_count=10,
# Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
max_message_count=100, # Accept up to 100 messages. By default this is 1.
read_timeout_millis=5000
# Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
)
)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK reference: read_messages | ReadMessagesOptions
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
List<Message> messages = client.readMessages("StreamName",
// By default, if no options are specified, it tries to read one message from the beginning of the stream.
new ReadMessagesOptions()
// Try to read from sequence number 100 or greater. By default this is 0.
.withDesiredStartSequenceNumber(100L)
// Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
.withMinMessageCount(10L)
// Accept up to 100 messages. By default this is 1.
.withMaxMessageCount(100L)
// Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
.withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis())
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK reference: readMessages | ReadMessagesOptions
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const messages = await client.readMessages("StreamName",
// By default, if no options are specified, it tries to read one message from the beginning of the stream.
new ReadMessagesOptions()
// Try to read from sequence number 100 or greater. By default this is 0.
.withDesiredStartSequenceNumber(100)
// Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1.
.withMinMessageCount(10)
// Accept up to 100 messages. By default this is 1.
.withMaxMessageCount(100)
// Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
.withReadTimeoutMillis(5 * 1000)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK reference: readMessages | ReadMessagesOptions
List streams
Get the list of streams in stream manager.
Requirements
This operation has the following requirements:
Examples
The following snippet gets a list of the streams (by name) in stream
manager.
- Python
-
client = StreamManagerClient()
try:
stream_names = client.list_streams()
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK reference: list_streams
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
List<String> streamNames = client.listStreams();
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK reference: listStreams
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const streams = await client.listStreams();
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK reference: listStreams
Describe message
stream
Get metadata about a stream, including the stream definition, size, and export
status.
Requirements
This operation has the following requirements:
Examples
The following snippet gets metadata about the stream named
StreamName
, including the stream's definition, size, and exporter
statuses.
- Python
-
client = StreamManagerClient()
try:
stream_description = client.describe_message_stream(stream_name="StreamName")
if stream_description.export_statuses[0].error_message:
# The last export of export destination 0 failed with some error
# Here is the last sequence number that was successfully exported
stream_description.export_statuses[0].last_exported_sequence_number
if (stream_description.storage_status.newest_sequence_number >
stream_description.export_statuses[0].last_exported_sequence_number):
pass
# The end of the stream is ahead of the last exported sequence number
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK reference: describe_message_stream
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
MessageStreamInfo description = client.describeMessageStream("StreamName");
String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage();
if (lastErrorMessage != null && !lastErrorMessage.equals("")) {
// The last export of export destination 0 failed with some error.
// Here is the last sequence number that was successfully exported.
description.getExportStatuses().get(0).getLastExportedSequenceNumber();
}
if (description.getStorageStatus().getNewestSequenceNumber() >
description.getExportStatuses().get(0).getLastExportedSequenceNumber()) {
// The end of the stream is ahead of the last exported sequence number.
}
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK reference: describeMessageStream
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const description = await client.describeMessageStream("StreamName");
const lastErrorMessage = description.exportStatuses[0].errorMessage;
if (lastErrorMessage) {
// The last export of export destination 0 failed with some error.
// Here is the last sequence number that was successfully exported.
description.exportStatuses[0].lastExportedSequenceNumber;
}
if (description.storageStatus.newestSequenceNumber >
description.exportStatuses[0].lastExportedSequenceNumber) {
// The end of the stream is ahead of the last exported sequence number.
}
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK reference: describeMessageStream
Update message
stream
Update properties of an existing stream. You might want to update a stream if your
requirements change after the stream was created. For example:
-
Add a new export
configuration for an AWS Cloud destination.
-
Increase the maximum size of a stream to change how data is exported or
retained. For example, the stream size in combination with your strategy on full
settings might result in data being deleted or rejected before stream manager
can process it.
-
Pause and resume exports; for example, if export tasks are long running and
you want to ration your upload data.
Your Lambda functions follow this high-level process to update a stream:
-
Get the description of the stream.
-
Update the target properties on the corresponding
MessageStreamDefinition
and subordinate objects.
-
Pass in the updated MessageStreamDefinition
. Make sure to include
the complete object definitions for the updated stream. Undefined properties
revert to the default values.
You can specify the sequence number of the message to use as the starting
message in the export.
Requirements
This operation has the following requirements:
Examples
The following snippet updates the stream named StreamName
. It updates
multiple properties of a stream that exports to Kinesis Data Streams.
- Python
-
client = StreamManagerClient()
try:
message_stream_info = client.describe_message_stream(STREAM_NAME)
message_stream_info.definition.max_size=536870912
message_stream_info.definition.stream_segment_size=33554432
message_stream_info.definition.time_to_live_millis=3600000
message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData
message_stream_info.definition.persistence=Persistence.Memory
message_stream_info.definition.flush_on_write=False
message_stream_info.definition.export_definition.kinesis=
[KinesisConfig(
# Updating Export definition to add a Kinesis Stream configuration.
identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))]
client.update_message_stream(message_stream_info.definition)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK reference: updateMessageStream | MessageStreamDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME);
// Update the message stream with new values.
client.updateMessageStream(
messageStreamInfo.getDefinition()
.withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
// Max Size update should be greater than initial Max Size defined in Create Message Stream request
.withMaxSize(536870912L) // Update Max Size to 512 MB.
.withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB.
.withFlushOnWrite(true) // Update flush on write to true.
.withPersistence(Persistence.Memory) // Update the persistence to Memory.
.withTimeToLiveMillis(3600000L) // Update TTL to 1 hour.
.withExportDefinition(
// Optional. Choose where/how the stream is exported to the AWS Cloud.
messageStreamInfo.getDefinition().getExportDefinition().
// Updating Export definition to add a Kinesis Stream configuration.
.withKinesis(new ArrayList<KinesisConfig>() {{
add(new KinesisConfig()
.withIdentifier(EXPORT_IDENTIFIER)
.withKinesisStreamName("test"));
}})
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK reference: update_message_stream | MessageStreamDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const messageStreamInfo = await c.describeMessageStream(STREAM_NAME);
await client.updateMessageStream(
messageStreamInfo.definition
// Max Size update should be greater than initial Max Size defined in Create Message Stream request
.withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB.
.withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB.
.withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour.
.withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
.withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory
.withFlushOnWrite(true) // Default is false. Updating to true.
.withExportDefinition(
// Optional. Choose where/how the stream is exported to the AWS Cloud.
messageStreamInfo.definition.exportDefinition
// Updating Export definition to add a Kinesis Stream configuration.
.withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())])
)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK reference: updateMessageStream | MessageStreamDefinition
Constraints for updating
streams
The following constraints apply when updating streams. Unless noted in the
following list, updates take effect immediately.
-
You can't update a stream's persistence. To change this behavior, delete the
stream and create a
stream that defines the new persistence policy.
-
You can update the maximum size of a stream only under the following
conditions:
-
The maximum size must be greater or equal to the current size of
the stream. To find this information, describe the stream
and then check the storage status of the returned MessageStreamInfo
object.
-
The maximum size must be greater than or equal to the stream's
segment size.
-
You can update the stream segment size to a value less than the maximum
size of the stream. The updated setting applies to new segments.
-
Updates to the time to live (TTL) property apply to new append operations.
If you decrease this value, stream manager might also delete existing
segments that exceed the TTL.
-
Updates to the strategy on full property apply to new append operations.
If you set the strategy to overwrite the oldest data, stream manager might
also overwrite existing segments based on the new setting.
-
Updates to the flush on write property apply to new messages.
-
Updates to export configurations apply to new exports. The update request
must include all export configurations that you want to support. Otherwise,
stream manager deletes them.
-
When you update an export configuration, specify the identifier of
the target export configuration.
-
To add an export configuration, specify a unique identifier for
the new export configuration.
-
To delete an export configuration, omit the export
configuration.
-
To update
the starting sequence number of an export configuration in a stream, you
must specify a value that's less than the latest sequence number.
To find this information, describe the stream
and then check the storage status of the returned MessageStreamInfo
object.
Delete message
stream
Deletes a stream. When you delete a stream, all of the stored data for the stream is
deleted from the disk.
Requirements
This operation has the following requirements:
Examples
The following snippet deletes the stream named StreamName
.
- Python
-
client = StreamManagerClient()
try:
client.delete_message_stream(stream_name="StreamName")
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK reference: deleteMessageStream
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
client.deleteMessageStream("StreamName");
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java SDK reference: delete_message_stream
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
await client.deleteMessageStream("StreamName");
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js SDK reference: deleteMessageStream
See also