Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Les composants Greengrass définis par l'utilisateur qui s'exécutent sur le périphérique principal de Greengrass peuvent utiliser StreamManagerClient
l'objet du SDK Stream Manager pour créer des flux dans le gestionnaire de flux, puis interagir avec les flux. Lorsqu'un composant crée un flux, il définit les AWS Cloud destinations, la hiérarchisation et les autres politiques d'exportation et de conservation des données pour le flux. Pour envoyer des données au gestionnaire de flux, les composants ajoutent les données au flux. Si une destination d'exportation est définie pour le flux, le gestionnaire de flux exporte le flux automatiquement.
Note
Généralement, les clients du gestionnaire de flux sont des composants Greengrass définis par l'utilisateur. Si votre analyse de rentabilisation l'exige, vous pouvez également autoriser les processus non liés aux composants exécutés sur le cœur de Greengrass (par exemple, un conteneur Docker) à interagir avec le gestionnaire de flux. Pour de plus amples informations, veuillez consulter Authentification client.
Les extraits de cette rubrique vous montrent comment les clients appellent des StreamManagerClient
méthodes pour travailler avec des flux. Pour plus de détails sur l'implémentation des méthodes et de leurs arguments, utilisez les liens vers la référence du SDK répertoriée après chaque extrait.
Si vous utilisez le gestionnaire de flux dans une fonction Lambda, votre fonction Lambda doit être instanciée StreamManagerClient
en dehors du gestionnaire de fonctions. Si la fonction est instanciée dans le gestionnaire, elle crée un client
et une connexion au gestionnaire de flux chaque fois qu'elle est appelée.
Note
Si vous effectuez une instanciation StreamManagerClient
dans le gestionnaire, vous devez appeler explicitement la méthode close()
lorsque le client
termine son travail. Sinon, le client
maintient la connexion ouverte et un autre thread actif jusqu'à ce que le script se termine.
StreamManagerClient
prend en charge les opérations suivantes :
Créer un flux de messages
Pour créer un flux, un composant Greengrass défini par l'utilisateur appelle la méthode create et transmet un objet. MessageStreamDefinition
Cet objet spécifie le nom unique du flux et définit comment le gestionnaire de flux doit gérer les nouvelles données lorsque la taille maximale du flux est atteinte. Vous pouvez utiliser MessageStreamDefinition
et ses types de données (tels que ExportDefinition
, StrategyOnFull
et Persistence
) pour définir d'autres propriétés de flux. Il s’agit des licences suivantes :
-
La cible AWS IoT Analytics, Kinesis Data Streams et AWS IoT SiteWise les destinations Amazon S3 pour les exportations automatiques. Pour de plus amples informations, veuillez consulter Exporter les configurations pour les AWS Cloud destinations prises en charge.
-
Exportez la priorité. Le gestionnaire de flux exporte les flux de priorité supérieure avant les flux de priorité inférieure.
-
Taille de lot et intervalle de lot maximaux pour AWS IoT Analytics Kinesis Data Streams AWS IoT SiteWise et les destinations. Le gestionnaire de flux exporte les messages lorsque l'une ou l'autre des conditions est remplie.
-
Time-to-live (TTL). Temps nécessaire pour garantir que les données du flux sont disponibles pour le traitement. Vous devez vous assurer que les données peuvent être consommées pendant cette période. Il ne s'agit pas d'une stratégie de suppression. Les données peuvent ne pas être supprimées immédiatement après la période de TTL.
-
Persistance des flux. Choisissez d'enregistrer les flux dans le système de fichiers afin de conserver les données lors des redémarrages du noyau ou d'enregistrer les flux en mémoire.
-
Numéro de séquence de départ. Spécifiez le numéro de séquence du message à utiliser comme message de départ lors de l'exportation.
Pour plus d'informationsMessageStreamDefinition
, consultez la référence du SDK pour votre langue cible :
-
MessageStreamDefinition
dans le SDK Java -
MessageStreamDefinition
dans le SDK Node.js -
MessageStreamDefinition
dans le SDK Python
Note
StreamManagerClient
fournit également une destination cible que vous pouvez utiliser pour exporter des flux vers un serveur HTTP. Cette cible n'est destinée qu'à des fins de test. Il n'est pas stable ni pris en charge pour une utilisation dans des environnements de production.
Après la création d'un flux, vos composants Greengrass peuvent ajouter des messages au flux pour envoyer des données à exporter et lire les messages du flux pour un traitement local. Le nombre de flux que vous créez dépend de vos capacités matérielles et de votre analyse de rentabilisation. L'une des stratégies consiste à créer un flux pour chaque canal cible dans AWS IoT Analytics le flux de données Kinesis, bien que vous puissiez définir plusieurs cibles pour un flux. Un flux a un cycle de vie durable.
Prérequis
Cette opération répond aux exigences suivantes :
-
Version minimale du SDK Stream Manager : Python : 1.1.0 | Java : 1.1.0 | Node.js : 1.1.0
Exemples
L'extrait de code suivant crée un flux nommé StreamName
. Il définit les propriétés du flux dans les types de données MessageStreamDefinition
et les types de données subordonnés.
client = StreamManagerClient()
try:
client.create_message_stream(MessageStreamDefinition(
name="StreamName", # Required.
max_size=268435456, # Default is 256 MB.
stream_segment_size=16777216, # Default is 16 MB.
time_to_live_millis=None, # By default, no TTL is enabled.
strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required.
persistence=Persistence.File, # Default is File.
flush_on_write=False, # Default is false.
export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the AWS Cloud.
kinesis=None,
iot_analytics=None,
iot_sitewise=None,
s3_task_executor=None
)
))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Référence du SDK Python : create_message_stream
Pour plus d'informations sur la configuration des destinations d'exportation, consultezExporter les configurations pour les AWS Cloud destinations prises en charge.
Ajouter un message
Pour envoyer des données au gestionnaire de flux à des fins d'exportation, vos composants Greengrass ajoutent les données au flux cible. La destination d'exportation détermine le type de données à transmettre à cette méthode.
Prérequis
Cette opération répond aux exigences suivantes :
-
Version minimale du SDK Stream Manager : Python : 1.1.0 | Java : 1.1.0 | Node.js : 1.1.0
Exemples
AWS IoT Analytics ou destinations d'exportation Kinesis Data Streams
L'extrait de code suivant ajoute un message au flux nommé StreamName
. Pour AWS IoT Analytics nos destinations Kinesis Data Streams, vos composants Greengrass ajoutent un blob de données.
Cet extrait de code répond aux exigences suivantes :
-
Version minimale du SDK Stream Manager : Python : 1.1.0 | Java : 1.1.0 | Node.js : 1.1.0
client = StreamManagerClient()
try:
sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data')
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
AWS IoT SiteWise destinations d'exportation
L'extrait de code suivant ajoute un message au flux nommé StreamName
. Pour les AWS IoT SiteWise destinations, vos composants Greengrass ajoutent un objet sérialisé. PutAssetPropertyValueEntry
Pour de plus amples informations, veuillez consulter Exporter vers AWS IoT SiteWise.
Note
Lorsque vous envoyez des données à AWS IoT SiteWise, celles-ci doivent répondre aux exigences de l'BatchPutAssetPropertyValue
action. Pour plus d’informations, consultez BatchPutAssetPropertyValue dans la Référence d’API AWS IoT SiteWise .
Cet extrait de code répond aux exigences suivantes :
-
Version minimale du SDK Stream Manager : Python : 1.1.0 | Java : 1.1.0 | Node.js : 1.1.0
client = StreamManagerClient() try: # SiteWise requires unique timestamps in all messages and also needs timestamps not earlier # than 10 minutes in the past. Add some randomness to time and offset. # Note: To create a new asset property data, you should use the classes defined in the # greengrasssdk.stream_manager module. time_in_nanos = TimeInNanos( time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) ) variant = Variant(double_value=random.random()) asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset) sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.
Référence du SDK Python : append_message | PutAssetPropertyValueEntry
Destinations d'exportation Amazon S3
L'extrait suivant ajoute une tâche d'exportation au flux nommé. StreamName
Pour les destinations Amazon S3, vos composants Greengrass ajoutent un S3ExportTaskDefinition
objet sérialisé contenant des informations sur le fichier d'entrée source et l'objet Amazon S3 cible. Si l'objet spécifié n'existe pas, Stream Manager le crée pour vous. Pour de plus amples informations, veuillez consulter Exportation vers Amazon S3.
Cet extrait de code répond aux exigences suivantes :
-
Version minimale du SDK Stream Manager : Python : 1.1.0 | Java : 1.1.0 | Node.js : 1.1.0
client = StreamManagerClient() try: # Append an Amazon S3 Task definition and print the sequence number. s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName") sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.
Référence du SDK Python : append_message | S3
Lire des messages
Lisez les messages d'un stream.
Prérequis
Cette opération répond aux exigences suivantes :
-
Version minimale du SDK Stream Manager : Python : 1.1.0 | Java : 1.1.0 | Node.js : 1.1.0
Exemples
L'extrait de code suivant lit les messages du flux nommé StreamName
. La méthode read utilise un objet ReadMessagesOptions
facultatif qui spécifie le numéro de séquence à partir duquel commencer la lecture, les nombres minimum et maximum à lire et un délai d'expiration pour la lecture des messages.
client = StreamManagerClient()
try:
message_list = client.read_messages(
stream_name="StreamName",
# By default, if no options are specified, it tries to read one message from the beginning of the stream.
options=ReadMessagesOptions(
desired_start_sequence_number=100,
# Try to read from sequence number 100 or greater. By default, this is 0.
min_message_count=10,
# Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
max_message_count=100, # Accept up to 100 messages. By default this is 1.
read_timeout_millis=5000
# Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
)
)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Référence du SDK Python : read_messages | ReadMessagesOptions
Afficher la liste des flux
Obtenez la liste des flux dans le gestionnaire de flux.
Prérequis
Cette opération répond aux exigences suivantes :
-
Version minimale du SDK Stream Manager : Python : 1.1.0 | Java : 1.1.0 | Node.js : 1.1.0
Exemples
L'extrait de code suivant récupère une liste des flux (par nom) dans le gestionnaire de flux.
client = StreamManagerClient()
try:
stream_names = client.list_streams()
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Décrire le flux de messages
Obtenez les métadonnées relatives à un flux, notamment sa définition, sa taille et son statut d'exportation.
Prérequis
Cette opération répond aux exigences suivantes :
-
Version minimale du SDK Stream Manager : Python : 1.1.0 | Java : 1.1.0 | Node.js : 1.1.0
Exemples
L'extrait de code suivant récupère des métadonnées sur le flux nommé StreamName
, en particulier la définition, la taille et les statuts d'exportation du flux.
client = StreamManagerClient()
try:
stream_description = client.describe_message_stream(stream_name="StreamName")
if stream_description.export_statuses[0].error_message:
# The last export of export destination 0 failed with some error
# Here is the last sequence number that was successfully exported
stream_description.export_statuses[0].last_exported_sequence_number
if (stream_description.storage_status.newest_sequence_number >
stream_description.export_statuses[0].last_exported_sequence_number):
pass
# The end of the stream is ahead of the last exported sequence number
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Mettre à jour le flux de messages
Mettez à jour les propriétés d'un flux existant. Vous souhaiterez peut-être mettre à jour un flux si vos exigences changent après sa création. Par exemple :
-
Ajoutez une nouvelle configuration d'exportation pour une AWS Cloud destination.
-
Augmentez la taille maximale d'un flux pour modifier la façon dont les données sont exportées ou conservées. Par exemple, la taille du flux associée à votre stratégie en matière de paramètres complets peut entraîner la suppression ou le rejet de données avant que le gestionnaire de flux ne puisse les traiter.
-
Interrompez et reprenez les exportations, par exemple, si les tâches d'exportation sont longues et que vous souhaitez rationner vos données de téléchargement.
Vos composants Greengrass suivent ce processus de haut niveau pour mettre à jour un flux :
-
Mettez à jour les propriétés cibles sur les objets correspondants
MessageStreamDefinition
et subordonnés. -
Transmettez la mise à jour
MessageStreamDefinition
. Assurez-vous d'inclure les définitions d'objets complètes pour le flux mis à jour. Les propriétés non définies reprennent leurs valeurs par défaut.Vous pouvez spécifier le numéro de séquence du message à utiliser comme message de départ lors de l'exportation.
Prérequis
Cette opération répond aux exigences suivantes :
-
Version minimale du SDK Stream Manager : Python : 1.1.0 | Java : 1.1.0 | Node.js : 1.1.0
Exemples
L'extrait suivant met à jour le flux nommé. StreamName
Elle met à jour plusieurs propriétés d'un flux exporté vers Kinesis Data Streams.
client = StreamManagerClient()
try:
message_stream_info = client.describe_message_stream(STREAM_NAME)
message_stream_info.definition.max_size=536870912
message_stream_info.definition.stream_segment_size=33554432
message_stream_info.definition.time_to_live_millis=3600000
message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData
message_stream_info.definition.persistence=Persistence.Memory
message_stream_info.definition.flush_on_write=False
message_stream_info.definition.export_definition.kinesis=
[KinesisConfig(
# Updating Export definition to add a Kinesis Stream configuration.
identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))]
client.update_message_stream(message_stream_info.definition)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Référence du SDK Python : | updateMessageStreamMessageStreamDefinition
Contraintes de mise à jour des flux
Les contraintes suivantes s'appliquent lors de la mise à jour des flux. Sauf indication contraire dans la liste suivante, les mises à jour prennent effet immédiatement.
-
Vous ne pouvez pas mettre à jour la persistance d'un flux. Pour modifier ce comportement, supprimez le flux et créez un flux qui définit la nouvelle politique de persistance.
-
Vous pouvez mettre à jour la taille maximale d'un flux uniquement dans les conditions suivantes :
-
La taille maximale doit être supérieure ou égale à la taille actuelle du flux. Pour trouver ces informations, décrivez le flux, puis vérifiez l'état de stockage de l'
MessageStreamInfo
objet renvoyé. -
La taille maximale doit être supérieure ou égale à la taille du segment du flux.
-
-
Vous pouvez mettre à jour la taille du segment de flux à une valeur inférieure à la taille maximale du flux. Le paramètre mis à jour s'applique aux nouveaux segments.
-
Les mises à jour de la propriété Time to Live (TTL) s'appliquent aux nouvelles opérations d'ajout. Si vous diminuez cette valeur, le gestionnaire de flux peut également supprimer les segments existants qui dépassent le TTL.
-
Les mises à jour de la stratégie relative à la propriété complète s'appliquent aux nouvelles opérations d'ajout. Si vous définissez une stratégie pour remplacer les données les plus anciennes, le gestionnaire de flux peut également remplacer les segments existants en fonction du nouveau paramètre.
-
Les mises à jour de la propriété flush on write s'appliquent aux nouveaux messages.
-
Les mises à jour des configurations d'exportation s'appliquent aux nouvelles exportations. La demande de mise à jour doit inclure toutes les configurations d'exportation que vous souhaitez prendre en charge. Dans le cas contraire, le gestionnaire de flux les supprime.
-
Lorsque vous mettez à jour une configuration d'exportation, spécifiez l'identifiant de la configuration d'exportation cible.
-
Pour ajouter une configuration d'exportation, spécifiez un identifiant unique pour la nouvelle configuration d'exportation.
-
Pour supprimer une configuration d'exportation, omettez-la.
-
-
Pour mettre à jour le numéro de séquence de départ d'une configuration d'exportation dans un flux, vous devez spécifier une valeur inférieure au dernier numéro de séquence. Pour trouver ces informations, décrivez le flux, puis vérifiez l'état de stockage de l'
MessageStreamInfo
objet renvoyé.
Supprimer le flux de messages
Supprime un flux. Lorsque vous supprimez un flux, toutes les données stockées dans le flux sont supprimées du disque.
Prérequis
Cette opération répond aux exigences suivantes :
-
Version minimale du SDK Stream Manager : Python : 1.1.0 | Java : 1.1.0 | Node.js : 1.1.0
Exemples
L'extrait de code suivant supprime le flux nommé StreamName
.
client = StreamManagerClient()
try:
client.delete_message_stream(stream_name="StreamName")
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Référence du SDK Python : deleteMessageStream
Consultez aussi
-
Gérez les flux de données sur les appareils principaux de Greengrass
-
Exporter les configurations pour les AWS Cloud destinations prises en charge
-
StreamManagerClient
dans la référence du SDK Stream Manager :