Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
StreamManagerClient À utiliser pour travailler avec des flux
Les composants Greengrass définis par l'utilisateur qui s'exécutent sur le périphérique principal de Greengrass peuvent utiliser StreamManagerClient
l'objet du SDK Stream Manager pour créer des flux dans le gestionnaire de flux, puis interagir avec les flux. Lorsqu'un composant crée un flux, il définit les AWS Cloud destinations, la hiérarchisation et les autres politiques d'exportation et de conservation des données pour le flux. Pour envoyer des données au gestionnaire de flux, les composants ajoutent les données au flux. Si une destination d'exportation est définie pour le flux, le gestionnaire de flux exporte le flux automatiquement.
Généralement, les clients du gestionnaire de flux sont des composants Greengrass définis par l'utilisateur. Si votre analyse de rentabilisation l'exige, vous pouvez également autoriser les processus non liés aux composants exécutés sur le cœur de Greengrass (par exemple, un conteneur Docker) à interagir avec le gestionnaire de flux. Pour plus d’informations, consultez Authentification client.
Les extraits de cette rubrique vous montrent comment les clients appellent des StreamManagerClient
méthodes pour travailler avec des flux. Pour plus de détails sur l'implémentation des méthodes et de leurs arguments, utilisez les liens vers la référence du SDK répertoriée après chaque extrait.
Si vous utilisez le gestionnaire de flux dans une fonction Lambda, votre fonction Lambda doit être instanciée StreamManagerClient
en dehors du gestionnaire de fonctions. Si la fonction est instanciée dans le gestionnaire, elle crée un client
et une connexion au gestionnaire de flux chaque fois qu'elle est appelée.
Si vous effectuez une instanciation StreamManagerClient
dans le gestionnaire, vous devez appeler explicitement la méthode close()
lorsque le client
termine son travail. Sinon, le client
maintient la connexion ouverte et un autre thread actif jusqu'à ce que le script se termine.
StreamManagerClient
prend en charge les opérations suivantes :
Créer un flux de messages
Pour créer un flux, un composant Greengrass défini par l'utilisateur appelle la méthode create et transmet un objet. MessageStreamDefinition
Cet objet spécifie le nom unique du flux et définit comment le gestionnaire de flux doit gérer les nouvelles données lorsque la taille maximale du flux est atteinte. Vous pouvez utiliser MessageStreamDefinition
et ses types de données (tels que ExportDefinition
, StrategyOnFull
et Persistence
) pour définir d'autres propriétés de flux. Il s'agit des licences suivantes :
-
La cibleAWS IoT Analytics, Kinesis Data Streams et AWS IoT SiteWise les destinations Amazon S3 pour les exportations automatiques. Pour plus d’informations, consultez Exporter les configurations pour les AWS Cloud destinations prises en charge.
-
Exportez la priorité. Le gestionnaire de flux exporte les flux de priorité supérieure avant les flux de priorité inférieure.
-
Taille de lot et intervalle de lot maximaux pour AWS IoT Analytics Kinesis Data Streams AWS IoT SiteWise et les destinations. Le gestionnaire de flux exporte les messages lorsque l'une ou l'autre des conditions est remplie.
-
T ime-to-live (TTL). Temps nécessaire pour garantir que les données du flux sont disponibles pour le traitement. Vous devez vous assurer que les données peuvent être consommées pendant cette période. Il ne s'agit pas d'une stratégie de suppression. Les données peuvent ne pas être supprimées immédiatement après la période de TTL.
-
Persistance des flux. Choisissez d'enregistrer les flux dans le système de fichiers afin de conserver les données lors des redémarrages du noyau ou d'enregistrer les flux en mémoire.
-
Numéro de séquence de départ. Spécifiez le numéro de séquence du message à utiliser comme message de départ lors de l'exportation.
Pour plus d'informationsMessageStreamDefinition
, consultez la référence du SDK pour votre langue cible :
StreamManagerClient
fournit également une destination cible que vous pouvez utiliser pour exporter des flux vers un serveur HTTP. Cette cible n'est destinée qu'à des fins de test. Il n'est pas stable ni pris en charge pour une utilisation dans des environnements de production.
Après la création d'un flux, vos composants Greengrass peuvent ajouter des messages au flux pour envoyer des données à exporter et lire les messages du flux pour un traitement local. Le nombre de flux que vous créez dépend de vos capacités matérielles et de votre analyse de rentabilisation. L'une des stratégies consiste à créer un flux pour chaque canal cible dans AWS IoT Analytics le flux de données Kinesis, bien que vous puissiez définir plusieurs cibles pour un flux. Un flux a un cycle de vie durable.
Prérequis
Cette opération répond aux exigences suivantes :
Exemples
L'extrait de code suivant crée un flux nommé StreamName
. Il définit les propriétés du flux dans les types de données MessageStreamDefinition
et les types de données subordonnés.
- Python
-
client = StreamManagerClient()
try:
client.create_message_stream(MessageStreamDefinition(
name="StreamName", # Required.
max_size=268435456, # Default is 256 MB.
stream_segment_size=16777216, # Default is 16 MB.
time_to_live_millis=None, # By default, no TTL is enabled.
strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required.
persistence=Persistence.File, # Default is File.
flush_on_write=False, # Default is false.
export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the AWS Cloud.
kinesis=None,
iot_analytics=None,
iot_sitewise=None,
s3_task_executor=None
)
))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Référence du SDK Python : create_message_stream | MessageStreamDefinition
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
client.createMessageStream(
new MessageStreamDefinition()
.withName("StreamName") // Required.
.withMaxSize(268435456L) // Default is 256 MB.
.withStreamSegmentSize(16777216L) // Default is 16 MB.
.withTimeToLiveMillis(null) // By default, no TTL is enabled.
.withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required.
.withPersistence(Persistence.File) // Default is File.
.withFlushOnWrite(false) // Default is false.
.withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS Cloud.
new ExportDefinition()
.withKinesis(null)
.withIotAnalytics(null)
.withIotSitewise(null)
.withS3(null)
)
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Référence du SDK Java : | createMessageStreamMessageStreamDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
await client.createMessageStream(
new MessageStreamDefinition()
.withName("StreamName") // Required.
.withMaxSize(268435456) // Default is 256 MB.
.withStreamSegmentSize(16777216) // Default is 16 MB.
.withTimeToLiveMillis(null) // By default, no TTL is enabled.
.withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required.
.withPersistence(Persistence.File) // Default is File.
.withFlushOnWrite(false) // Default is false.
.withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS Cloud.
new ExportDefinition()
.withKinesis(null)
.withIotAnalytics(null)
.withIotSiteWise(null)
.withS3(null)
)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Référence du SDK Node.js : | createMessageStreamMessageStreamDefinition
Pour plus d'informations sur la configuration des destinations d'exportation, consultezExporter les configurations pour les AWS Cloud destinations prises en charge.
Ajouter un message
Pour envoyer des données au gestionnaire de flux à des fins d'exportation, vos composants Greengrass ajoutent les données au flux cible. La destination d'exportation détermine le type de données à transmettre à cette méthode.
Prérequis
Cette opération répond aux exigences suivantes :
Exemples
AWS IoT Analyticsou destinations d'exportation Kinesis Data Streams
L'extrait de code suivant ajoute un message au flux nommé StreamName
. Pour AWS IoT Analytics nos destinations Kinesis Data Streams, vos composants Greengrass ajoutent un blob de données.
Cet extrait de code répond aux exigences suivantes :
- Python
-
client = StreamManagerClient()
try:
sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data')
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Référence du SDK Python : append_message
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes());
} catch (StreamManagerException e) {
// Properly handle exception.
}
Référence du SDK Java : AppendMessage
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array"));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Référence du SDK Node.js : AppendMessage
AWS IoT SiteWisedestinations d'exportation
L'extrait de code suivant ajoute un message au flux nommé StreamName
. Pour les AWS IoT SiteWise destinations, vos composants Greengrass ajoutent un objet sérialisé. PutAssetPropertyValueEntry
Pour plus d’informations, consultez Exportation vers AWS IoT SiteWise.
Lorsque vous envoyez des données àAWS IoT SiteWise, celles-ci doivent répondre aux exigences de l'BatchPutAssetPropertyValue
action. Pour plus d'informations, consultez BatchPutAssetPropertyValue dans la Référence d'API AWS IoT SiteWise.
Cet extrait de code répond aux exigences suivantes :
- Python
-
client = StreamManagerClient()
try:
# SiteWise requires unique timestamps in all messages and also needs timestamps not earlier
# than 10 minutes in the past. Add some randomness to time and offset.
# Note: To create a new asset property data, you should use the classes defined in the
# greengrasssdk.stream_manager module.
time_in_nanos = TimeInNanos(
time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
)
variant = Variant(double_value=random.random())
asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset)
sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Référence du SDK Python : append_message | PutAssetPropertyValueEntry
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
Random rand = new Random();
// Note: To create a new asset property data, you should use the classes defined in the
// com.amazonaws.greengrass.streammanager.model.sitewise package.
List<AssetPropertyValue> entries = new ArrayList<>() ;
// IoTSiteWise requires unique timestamps in all messages and also needs timestamps not earlier
// than 10 minutes in the past. Add some randomness to time and offset.
final int maxTimeRandomness = 60;
final int maxOffsetRandomness = 10000;
double randomValue = rand.nextDouble();
TimeInNanos timestamp = new TimeInNanos()
.withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness))
.withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness)));
AssetPropertyValue entry = new AssetPropertyValue()
.withValue(new Variant().withDoubleValue(randomValue))
.withQuality(Quality.GOOD)
.withTimestamp(timestamp);
entries.add(entry);
PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
.withEntryId(UUID.randomUUID().toString())
.withPropertyAlias("PropertyAlias")
.withPropertyValues(entries);
long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (StreamManagerException e) {
// Properly handle exception.
}
Référence du SDK Java : AppendMessage | PutAssetPropertyValueEntry
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const maxTimeRandomness = 60;
const maxOffsetRandomness = 10000;
const randomValue = Math.random();
// Note: To create a new asset property data, you should use the classes defined in the
// aws-greengrass-core-sdk StreamManager module.
const timestamp = new TimeInNanos()
.withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness))
.withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness));
const entry = new AssetPropertyValue()
.withValue(new Variant().withDoubleValue(randomValue))
.withQuality(Quality.GOOD)
.withTimestamp(timestamp);
const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
.withEntryId(`${ENTRY_ID_PREFIX}${i}`)
.withPropertyAlias("PropertyAlias")
.withPropertyValues([entry]);
const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Référence du SDK Node.js : AppendMessage | PutAssetPropertyValueEntry
Destinations d'exportation Amazon S3
L'extrait suivant ajoute une tâche d'exportation au flux nommé. StreamName
Pour les destinations Amazon S3, vos composants Greengrass ajoutent un S3ExportTaskDefinition
objet sérialisé contenant des informations sur le fichier d'entrée source et l'objet Amazon S3 cible. Si l'objet spécifié n'existe pas, Stream Manager le crée pour vous. Pour plus d’informations, consultez Exportation vers Amazon S3.
Cet extrait de code répond aux exigences suivantes :
- Python
-
client = StreamManagerClient()
try:
# Append an Amazon S3 Task definition and print the sequence number.
s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName")
sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Référence du SDK Python : append_message | S3 ExportTaskDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
// Append an Amazon S3 export task definition and print the sequence number.
S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition()
.withBucket("BucketName")
.withKey("KeyName")
.withInputUrl("URLToFile");
long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition));
} catch (StreamManagerException e) {
// Properly handle exception.
}
Référence du SDK Java : AppendMessage | S3 ExportTaskDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
// Append an Amazon S3 export task definition and print the sequence number.
const taskDefinition = new S3ExportTaskDefinition()
.withBucket("BucketName")
.withKey("KeyName")
.withInputUrl("URLToFile");
const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition)));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Référence du SDK Node.js : AppendMessage | S3 ExportTaskDefinition
Lire des messages
Lisez les messages d'un stream.
Prérequis
Cette opération répond aux exigences suivantes :
Exemples
L'extrait de code suivant lit les messages du flux nommé StreamName
. La méthode read utilise un objet ReadMessagesOptions
facultatif qui spécifie le numéro de séquence à partir duquel commencer la lecture, les nombres minimum et maximum à lire et un délai d'expiration pour la lecture des messages.
- Python
-
client = StreamManagerClient()
try:
message_list = client.read_messages(
stream_name="StreamName",
# By default, if no options are specified, it tries to read one message from the beginning of the stream.
options=ReadMessagesOptions(
desired_start_sequence_number=100,
# Try to read from sequence number 100 or greater. By default, this is 0.
min_message_count=10,
# Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
max_message_count=100, # Accept up to 100 messages. By default this is 1.
read_timeout_millis=5000
# Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
)
)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Référence du SDK Python : read_messages | ReadMessagesOptions
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
List<Message> messages = client.readMessages("StreamName",
// By default, if no options are specified, it tries to read one message from the beginning of the stream.
new ReadMessagesOptions()
// Try to read from sequence number 100 or greater. By default this is 0.
.withDesiredStartSequenceNumber(100L)
// Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
.withMinMessageCount(10L)
// Accept up to 100 messages. By default this is 1.
.withMaxMessageCount(100L)
// Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
.withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis())
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Référence du SDK Java : ReadMessages | ReadMessagesOptions
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const messages = await client.readMessages("StreamName",
// By default, if no options are specified, it tries to read one message from the beginning of the stream.
new ReadMessagesOptions()
// Try to read from sequence number 100 or greater. By default this is 0.
.withDesiredStartSequenceNumber(100)
// Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1.
.withMinMessageCount(10)
// Accept up to 100 messages. By default this is 1.
.withMaxMessageCount(100)
// Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
.withReadTimeoutMillis(5 * 1000)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Référence du SDK Node.js : ReadMessages | ReadMessagesOptions
Afficher la liste des flux
Obtenez la liste des flux dans le gestionnaire de flux.
Prérequis
Cette opération répond aux exigences suivantes :
Exemples
L'extrait de code suivant récupère une liste des flux (par nom) dans le gestionnaire de flux.
- Python
-
client = StreamManagerClient()
try:
stream_names = client.list_streams()
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Référence du SDK Python : list_streams
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
List<String> streamNames = client.listStreams();
} catch (StreamManagerException e) {
// Properly handle exception.
}
Référence du SDK Java : ListStreams
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const streams = await client.listStreams();
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Référence du SDK Node.js : ListStreams
Décrire le flux de messages
Obtenez les métadonnées relatives à un flux, notamment sa définition, sa taille et son statut d'exportation.
Prérequis
Cette opération répond aux exigences suivantes :
Exemples
L'extrait de code suivant récupère des métadonnées sur le flux nommé StreamName
, en particulier la définition, la taille et les statuts d'exportation du flux.
- Python
-
client = StreamManagerClient()
try:
stream_description = client.describe_message_stream(stream_name="StreamName")
if stream_description.export_statuses[0].error_message:
# The last export of export destination 0 failed with some error
# Here is the last sequence number that was successfully exported
stream_description.export_statuses[0].last_exported_sequence_number
if (stream_description.storage_status.newest_sequence_number >
stream_description.export_statuses[0].last_exported_sequence_number):
pass
# The end of the stream is ahead of the last exported sequence number
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Référence du SDK Python : describe_message_stream
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
MessageStreamInfo description = client.describeMessageStream("StreamName");
String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage();
if (lastErrorMessage != null && !lastErrorMessage.equals("")) {
// The last export of export destination 0 failed with some error.
// Here is the last sequence number that was successfully exported.
description.getExportStatuses().get(0).getLastExportedSequenceNumber();
}
if (description.getStorageStatus().getNewestSequenceNumber() >
description.getExportStatuses().get(0).getLastExportedSequenceNumber()) {
// The end of the stream is ahead of the last exported sequence number.
}
} catch (StreamManagerException e) {
// Properly handle exception.
}
Référence du SDK Java : describeMessageStream
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const description = await client.describeMessageStream("StreamName");
const lastErrorMessage = description.exportStatuses[0].errorMessage;
if (lastErrorMessage) {
// The last export of export destination 0 failed with some error.
// Here is the last sequence number that was successfully exported.
description.exportStatuses[0].lastExportedSequenceNumber;
}
if (description.storageStatus.newestSequenceNumber >
description.exportStatuses[0].lastExportedSequenceNumber) {
// The end of the stream is ahead of the last exported sequence number.
}
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Référence du SDK Node.js : describeMessageStream
Mettre à jour le flux de messages
Mettez à jour les propriétés d'un flux existant. Vous souhaiterez peut-être mettre à jour un flux si vos exigences changent après sa création. Par exemple :
-
Ajoutez une nouvelle configuration d'exportation pour une AWS Cloud destination.
-
Augmentez la taille maximale d'un flux pour modifier la façon dont les données sont exportées ou conservées. Par exemple, la taille du flux associée à votre stratégie en matière de paramètres complets peut entraîner la suppression ou le rejet de données avant que le gestionnaire de flux ne puisse les traiter.
-
Interrompez et reprenez les exportations, par exemple, si les tâches d'exportation sont longues et que vous souhaitez rationner vos données de téléchargement.
Vos composants Greengrass suivent ce processus de haut niveau pour mettre à jour un flux :
-
Obtenez la description du stream.
-
Mettez à jour les propriétés cibles sur les objets correspondants MessageStreamDefinition
et subordonnés.
-
Transmettez la mise à jourMessageStreamDefinition
. Assurez-vous d'inclure les définitions d'objets complètes pour le flux mis à jour. Les propriétés non définies reprennent leurs valeurs par défaut.
Vous pouvez spécifier le numéro de séquence du message à utiliser comme message de départ lors de l'exportation.
Prérequis
Cette opération répond aux exigences suivantes :
Exemples
L'extrait suivant met à jour le flux nommé. StreamName
Elle met à jour plusieurs propriétés d'un flux exporté vers Kinesis Data Streams.
- Python
-
client = StreamManagerClient()
try:
message_stream_info = client.describe_message_stream(STREAM_NAME)
message_stream_info.definition.max_size=536870912
message_stream_info.definition.stream_segment_size=33554432
message_stream_info.definition.time_to_live_millis=3600000
message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData
message_stream_info.definition.persistence=Persistence.Memory
message_stream_info.definition.flush_on_write=False
message_stream_info.definition.export_definition.kinesis=
[KinesisConfig(
# Updating Export definition to add a Kinesis Stream configuration.
identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))]
client.update_message_stream(message_stream_info.definition)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Référence du SDK Python : | updateMessageStreamMessageStreamDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME);
// Update the message stream with new values.
client.updateMessageStream(
messageStreamInfo.getDefinition()
.withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
// Max Size update should be greater than initial Max Size defined in Create Message Stream request
.withMaxSize(536870912L) // Update Max Size to 512 MB.
.withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB.
.withFlushOnWrite(true) // Update flush on write to true.
.withPersistence(Persistence.Memory) // Update the persistence to Memory.
.withTimeToLiveMillis(3600000L) // Update TTL to 1 hour.
.withExportDefinition(
// Optional. Choose where/how the stream is exported to the AWS Cloud.
messageStreamInfo.getDefinition().getExportDefinition().
// Updating Export definition to add a Kinesis Stream configuration.
.withKinesis(new ArrayList<KinesisConfig>() {{
add(new KinesisConfig()
.withIdentifier(EXPORT_IDENTIFIER)
.withKinesisStreamName("test"));
}})
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Référence du SDK Java : update_message_stream | MessageStreamDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const messageStreamInfo = await c.describeMessageStream(STREAM_NAME);
await client.updateMessageStream(
messageStreamInfo.definition
// Max Size update should be greater than initial Max Size defined in Create Message Stream request
.withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB.
.withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB.
.withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour.
.withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
.withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory
.withFlushOnWrite(true) // Default is false. Updating to true.
.withExportDefinition(
// Optional. Choose where/how the stream is exported to the AWS Cloud.
messageStreamInfo.definition.exportDefinition
// Updating Export definition to add a Kinesis Stream configuration.
.withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())])
)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Référence du SDK Node.js : | updateMessageStreamMessageStreamDefinition
Contraintes de mise à jour des flux
Les contraintes suivantes s'appliquent lors de la mise à jour des flux. Sauf indication contraire dans la liste suivante, les mises à jour prennent effet immédiatement.
-
Vous ne pouvez pas mettre à jour la persistance d'un flux. Pour modifier ce comportement, supprimez le flux et créez un flux qui définit la nouvelle politique de persistance.
-
Vous pouvez mettre à jour la taille maximale d'un flux uniquement dans les conditions suivantes :
-
La taille maximale doit être supérieure ou égale à la taille actuelle du flux. Pour trouver ces informations, décrivez le flux, puis vérifiez l'état de stockage de l'MessageStreamInfo
objet renvoyé.
-
La taille maximale doit être supérieure ou égale à la taille du segment du flux.
-
Vous pouvez mettre à jour la taille du segment de flux à une valeur inférieure à la taille maximale du flux. Le paramètre mis à jour s'applique aux nouveaux segments.
-
Les mises à jour de la propriété Time to Live (TTL) s'appliquent aux nouvelles opérations d'ajout. Si vous diminuez cette valeur, le gestionnaire de flux peut également supprimer les segments existants qui dépassent le TTL.
-
Les mises à jour de la stratégie relative à la propriété complète s'appliquent aux nouvelles opérations d'ajout. Si vous définissez une stratégie pour remplacer les données les plus anciennes, le gestionnaire de flux peut également remplacer les segments existants en fonction du nouveau paramètre.
-
Les mises à jour de la propriété flush on write s'appliquent aux nouveaux messages.
-
Les mises à jour des configurations d'exportation s'appliquent aux nouvelles exportations. La demande de mise à jour doit inclure toutes les configurations d'exportation que vous souhaitez prendre en charge. Dans le cas contraire, le gestionnaire de flux les supprime.
-
Lorsque vous mettez à jour une configuration d'exportation, spécifiez l'identifiant de la configuration d'exportation cible.
-
Pour ajouter une configuration d'exportation, spécifiez un identifiant unique pour la nouvelle configuration d'exportation.
-
Pour supprimer une configuration d'exportation, omettez-la.
-
Pour mettre à jour le numéro de séquence de départ d'une configuration d'exportation dans un flux, vous devez spécifier une valeur inférieure au dernier numéro de séquence. Pour trouver ces informations, décrivez le flux, puis vérifiez l'état de stockage de l'MessageStreamInfo
objet renvoyé.
Supprimer le flux de messages
Supprime un flux. Lorsque vous supprimez un flux, toutes les données stockées dans le flux sont supprimées du disque.
Prérequis
Cette opération répond aux exigences suivantes :
Exemples
L'extrait de code suivant supprime le flux nommé StreamName
.
- Python
-
client = StreamManagerClient()
try:
client.delete_message_stream(stream_name="StreamName")
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Référence du SDK Python : deleteMessageStream
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
client.deleteMessageStream("StreamName");
} catch (StreamManagerException e) {
// Properly handle exception.
}
Référence du SDK Java : delete_message_stream
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
await client.deleteMessageStream("StreamName");
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Référence du SDK Node.js : deleteMessageStream
Consultez aussi