Utilizza StreamManagerClient per lavorare con i flussi - AWS IoT Greengrass

AWS IoT Greengrass Version 1 è entrato nella fase di estensione della vita utile il 30 giugno 2023. Per ulteriori informazioni, consulta la politica AWS IoT Greengrass V1 di manutenzione. Dopo questa data, AWS IoT Greengrass V1 non rilascerà aggiornamenti che forniscano funzionalità, miglioramenti, correzioni di bug o patch di sicurezza. I dispositivi che funzionano AWS IoT Greengrass V1 non subiranno interruzioni e continueranno a funzionare e a connettersi al cloud. Ti consigliamo vivamente di eseguire la migrazione a AWS IoT Greengrass Version 2, che aggiunge nuove importanti funzionalità e supporto per piattaforme aggiuntive.

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Utilizza StreamManagerClient per lavorare con i flussi

Funzioni Lambda definite dall'utente in esecuzione sulAWS IoT Greengrasscore può usare ilStreamManagerClientl'oggetto nelAWS IoT GreengrassCore SDKper creare flussi inGestore di flussie quindi interagisci con i flussi. Quando una funzione Lambda crea un flusso, definisce ilCloud AWSdestinazioni, assegnazione delle priorità e altre policy di esportazione e conservazione dei dati per il flusso. Per inviare dati a stream manager, le funzioni Lambda aggiungono i dati allo stream. Se viene definita una destinazione di esportazione per il flusso, stream manager esporta automaticamente il flusso.

Nota

In genere, i client del gestore flussi sono funzioni Lambda definite dall'utente. Se richiesto dal business case, puoi anche consentire a processi non Lambda in esecuzione sul Greengrass core (ad esempio, un container Docker) di interagire con stream manager. Per ulteriori informazioni, consulta la pagina Autenticazione client .

I frammenti di codice in questo argomento mostrano come chiamano i clientStreamManagerClientper lavorare con i flussi. Per i dettagli di implementazione relativi ai metodi e ai loro argomenti, utilizza i collegamenti al riferimento SDK elencati dopo ogni frammento di codice. Per i tutorial che includono una funzione Python Lambda completa, vedereEsportazione di flussi di dati inCloud AWS(console)oEsportazione di flussi di dati inCloud AWS(CLI).

La funzione Lambda dovrebbe creare un'istanzaStreamManagerCliental di fuori del gestore di funzioni. Se viene creata un'istanza nel gestore, la funzione crea un client e una connessione al gestore flussi ogni volta che viene richiamata.

Nota

Se si esegue un'istanza StreamManagerClient nel gestore, è necessario chiamare esplicitamente il metodo close() quando client completa il suo lavoro. In caso contrario, client mantiene la connessione aperta e un altro thread in esecuzione fino alla chiusura dello script.

StreamManagerClient supporta le seguenti operazioni:

Creazione del flusso di messaggi

Per creare un flusso, una funzione Lambda definita dall'utente chiama il metodo create e passa in unMessageStreamDefinitionoggetto. Questo oggetto specifica il nome univoco per il flusso e definisce il modo in cui stream manager deve gestire i nuovi dati quando viene raggiunta la dimensione massima del flusso. È possibile utilizzare MessageStreamDefinition e relativi tipi di dati (ad esempio ExportDefinition, StrategyOnFull e Persistence) per definire altre proprietà del flusso. Eccone alcuni:

  • L'obiettivoAWS IoT Analytics, Kinesis Data Streams,AWS IoT SiteWisee destinazioni Amazon S3 per le esportazioni automatiche. Per ulteriori informazioni, consulta la pagina Configurazioni di esportazione per supportateCloud AWSdestinazioni .

  • Priorità di esportazione. Stream manager esporta i flussi con priorità più alta prima dei flussi con priorità più bassa.

  • Dimensione del batch massima e intervallo batch perAWS IoT AnalyticsKinesis Data Streams eAWS IoT SiteWisedestinazioni. Stream manager esporta i messaggi quando viene soddisfatta una delle due condizioni.

  • Time-to-live (TTL). Il tempo necessario per garantire che i dati del flusso siano disponibili per l'elaborazione. È necessario assicurarsi che i dati possano essere utilizzati entro questo periodo di tempo. Questa non è una policy di eliminazione. È possibile che i dati non vengano eliminati immediatamente dopo il periodo TTL.

  • Persistenza del flusso. Scegliere di salvare i flussi nel file system per mantenere i dati tra riavvii core o salvare i flussi in memoria.

  • Numero di sequenza di partenza. Specificare il numero di sequenza del messaggio da utilizzare come messaggio iniziale nell'esportazione.

Per ulteriori informazioni suMessageStreamDefinition, vedere il riferimento SDK per la lingua di destinazione:

Nota

StreamManagerClientfornisce anche una destinazione di destinazione che è possibile utilizzare per esportare i flussi in un server HTTP. Questo target è destinato esclusivamente a scopi di test. Non è stabile o supportato per l'uso in ambienti di produzione.

Dopo aver creato un flusso, le funzioni Lambda possonoAggiunta di messaggiallo stream per inviare dati per l'esportazione elettura di messaggidal flusso per l'elaborazione locale. Il numero di flussi creati dipende dalle funzionalità hardware e dal business case. Una strategia consiste nel creare un flusso per ogni canale di destinazione inAWS IoT Analyticso flusso di dati Kinesis, anche se è possibile definire più target per un flusso. Un flusso ha una lunga durata.

Requisiti

Questa operazione ha i requisiti seguenti:

  • MinimoAWS IoT GreengrassVersione di Core: 1.10.0

  • MinimoAWS IoT GreengrassVersione SDK: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0

Nota

Creazione di flussi con unAWS IoT SiteWiseo la destinazione di esportazione Amazon S3 ha i requisiti seguenti:

  • MinimoAWS IoT GreengrassVersione di Core: 1.11.0

  • MinimoAWS IoT GreengrassVersione SDK: Python: 1.6.0 | Java: 1.5.0 | Node.js: 1.7.0

Esempi

Il frammento di codice seguente crea un flusso denominato StreamName. Definisce le proprietà del flusso nelMessageStreamDefinitione tipi di dati subordinati.

Python
client = StreamManagerClient() try: client.create_message_stream(MessageStreamDefinition( name="StreamName", # Required. max_size=268435456, # Default is 256 MB. stream_segment_size=16777216, # Default is 16 MB. time_to_live_millis=None, # By default, no TTL is enabled. strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required. persistence=Persistence.File, # Default is File. flush_on_write=False, # Default is false. export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the Cloud AWS. kinesis=None, iot_analytics=None, iot_sitewise=None, s3_task_executor=None ) )) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Riferimento SDK Python:create_message_stream|MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456L) // Default is 256 MB. .withStreamSegmentSize(16777216L) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Cloud AWS. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSitewise(null) .withS3TaskExecutor(null) ) ); } catch (StreamManagerException e) { // Properly handle exception. }

Riferimento SDK Java:createMessageStream|MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456) // Default is 256 MB. .withStreamSegmentSize(16777216) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Cloud AWS. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSitewise(null) .withS3TaskExecutor(null) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Riferimento SDK Node.js:createMessageStream|MessageStreamDefinition

Per ulteriori informazioni sulla configurazione delle destinazioni di esportazione, consultaConfigurazioni di esportazione per supportateCloud AWSdestinazioni.

 

Aggiunta di un messaggio

Per inviare i dati al gestore flussi per l'esportazione, le funzioni Lambda aggiungono i dati al flusso di destinazione. La destinazione di esportazione determina il tipo di dati da passare a questo metodo.

Requisiti

Questa operazione ha i requisiti seguenti:

  • MinimoAWS IoT GreengrassVersione di Core: 1.10.0

  • MinimoAWS IoT GreengrassVersione SDK: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0

Nota

Aggiunta di messaggi con unAWS IoT SiteWiseo la destinazione di esportazione Amazon S3 ha i requisiti seguenti:

  • MinimoAWS IoT GreengrassVersione di Core: 1.11.0

  • MinimoAWS IoT GreengrassVersione SDK: Python: 1.6.0 | Java: 1.5.0 | Node.js: 1.7.0

Esempi

AWS IoT Analyticso le destinazioni di esportazione di Kinesis Data Streams

Il frammento di codice seguente aggiunge un messaggio al flusso denominato StreamName. PerAWS IoT Analyticso destinazioni Kinesis Data Streams, le tue funzioni Lambda aggiungono un blob di dati.

Questo frammento ha i requisiti seguenti:

  • MinimoAWS IoT GreengrassVersione di Core: 1.10.0

  • MinimoAWS IoT GreengrassVersione SDK: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0

Python
client = StreamManagerClient() try: sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data') except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Riferimento SDK Python:append_message

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes()); } catch (StreamManagerException e) { // Properly handle exception. }

Riferimento SDK Java:appendMessage

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array")); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Riferimento SDK Node.js:appendMessage

AWS IoT SiteWisedestinazioni di esportazione

Il frammento di codice seguente aggiunge un messaggio al flusso denominato StreamName. PerAWS IoT SiteWisedestinazioni, le tue funzioni Lambda aggiungono un serializzatoPutAssetPropertyValueEntryoggetto. Per ulteriori informazioni, consulta la pagina Esportazione di inAWS IoT SiteWise .

Nota

Quando si inviano dati aAWS IoT SiteWisei dati devono soddisfare i requisiti delBatchPutAssetPropertyValueOperazione . Per ulteriori informazioni, consulta BatchPutAssetPropertyValue nella Documentazione di riferimento API AWS IoT SiteWise.

Questo frammento ha i requisiti seguenti:

  • MinimoAWS IoT GreengrassVersione di Core: 1.11.0

  • MinimoAWS IoT GreengrassVersione SDK: Python: 1.6.0 | Java: 1.5.0 | Node.js: 1.7.0

Python
client = StreamManagerClient() try: # SiteWise requires unique timestamps in all messages. Add some randomness to time and offset. # Note: To create a new asset property data, you should use the classes defined in the # greengrasssdk.stream_manager module. time_in_nanos = TimeInNanos( time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) ) variant = Variant(double_value=random.random()) asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset) sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Riferimento SDK Python:append_message|PutAssetPropertyValueEntry

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { Random rand = new Random(); // Note: To create a new asset property data, you should use the classes defined in the // com.amazonaws.greengrass.streammanager.model.sitewise package. List<AssetPropertyValue> entries = new ArrayList<>() ; // IoTSiteWise requires unique timestamps in all messages. Add some randomness to time and offset. final int maxTimeRandomness = 60; final int maxOffsetRandomness = 10000; double randomValue = rand.nextDouble(); TimeInNanos timestamp = new TimeInNanos() .withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness)) .withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness))); AssetPropertyValue entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); entries.add(entry); PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(UUID.randomUUID().toString()) .withPropertyAlias("PropertyAlias") .withPropertyValues(entries); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (StreamManagerException e) { // Properly handle exception. }

Riferimento SDK Java:appendMessage|PutAssetPropertyValueEntry

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const maxTimeRandomness = 60; const maxOffsetRandomness = 10000; const randomValue = Math.random(); // Note: To create a new asset property data, you should use the classes defined in the // aws-greengrass-core-sdk StreamManager module. const timestamp = new TimeInNanos() .withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness)) .withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness)); const entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(`${ENTRY_ID_PREFIX}${i}`) .withPropertyAlias("PropertyAlias") .withPropertyValues([entry]); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Riferimento SDK Node.js:appendMessage|PutAssetPropertyValueEntry

Destinazioni di esportazione di Amazon S3

Il frammento di codice seguente aggiunge un'attività di esportazione al flusso denominatoStreamName. Per le destinazioni Amazon S3, le tue funzioni Lambda aggiungono una serie serializzataS3ExportTaskDefinitionoggetto che contiene informazioni sul file di input di origine e sull'oggetto Amazon S3 di destinazione. Se l'oggetto specificato non esiste, Stream Manager lo crea per te. Per ulteriori informazioni, consulta la pagina Esportazione di in Amazon S3 .

Questo frammento ha i requisiti seguenti:

  • MinimoAWS IoT GreengrassVersione di Core: 1.11.0

  • MinimoAWS IoT GreengrassVersione SDK: Python: 1.6.0 | Java: 1.5.0 | Node.js: 1.7.0

Python
client = StreamManagerClient() try: # Append an Amazon S3 Task definition and print the sequence number. s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName") sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Riferimento SDK Python:append_message|Definizione attività di esportazione S3

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { // Append an Amazon S3 export task definition and print the sequence number. S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition)); } catch (StreamManagerException e) { // Properly handle exception. }

Riferimento SDK Java:appendMessage|Definizione attività di esportazione S3

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { // Append an Amazon S3 export task definition and print the sequence number. const taskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition))); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Riferimento SDK Node.js:appendMessage|Definizione attività di esportazione S3

 

Lettura di messaggi

Leggere i messaggi da un flusso.

Requisiti

Questa operazione ha i requisiti seguenti:

  • MinimoAWS IoT GreengrassVersione di Core: 1.10.0

  • MinimoAWS IoT GreengrassVersione SDK: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0

Esempi

Il frammento di codice seguente legge i messaggi dal flusso denominato StreamName. Il metodo di lettura accetta un oggetto ReadMessagesOptions facoltativo che specifica il numero di sequenza da cui iniziare la lettura, i numeri minimo e massimo da leggere e un timeout per la lettura dei messaggi.

Python
client = StreamManagerClient() try: message_list = client.read_messages( stream_name="StreamName", # By default, if no options are specified, it tries to read one message from the beginning of the stream. options=ReadMessagesOptions( desired_start_sequence_number=100, # Try to read from sequence number 100 or greater. By default, this is 0. min_message_count=10, # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. max_message_count=100, # Accept up to 100 messages. By default this is 1. read_timeout_millis=5000 # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. ) ) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Riferimento SDK Python:read_messages|ReadMessagesOptions

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<Message> messages = client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100L) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. .withMinMessageCount(10L) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100L) // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis()) ); } catch (StreamManagerException e) { // Properly handle exception. }

Riferimento SDK Java:readMessages|ReadMessagesOptions

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messages = await client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1. .withMinMessageCount(10) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100) // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(5 * 1000) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Riferimento SDK Node.js:readMessages|ReadMessagesOptions

 

Visualizzazione dell'elenco di flussi

Ottieni l'elenco dei flussi in stream manager.

Requisiti

Questa operazione ha i requisiti seguenti:

  • MinimoAWS IoT GreengrassVersione di Core: 1.10.0

  • MinimoAWS IoT GreengrassVersione SDK: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0

Esempi

Il frammento di codice seguente ottiene un elenco dei flussi (per nome) in stream manager.

Python
client = StreamManagerClient() try: stream_names = client.list_streams() except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Riferimento SDK Python:list_streams

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<String> streamNames = client.listStreams(); } catch (StreamManagerException e) { // Properly handle exception. }

Riferimento SDK Java:listStreams

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const streams = await client.listStreams(); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Riferimento SDK Node.js:listStreams

 

Descrizione del flusso di messaggi

Ottieni i metadati relativi a un flusso, inclusi la definizione, le dimensioni e lo stato di esportazione del flusso.

Requisiti

Questa operazione ha i requisiti seguenti:

  • MinimoAWS IoT GreengrassVersione di Core: 1.10.0

  • MinimoAWS IoT GreengrassVersione SDK: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0

Esempi

Il frammento di codice seguente ottiene i metadati relativi al flusso denominato StreamName, inclusi la definizione, le dimensioni e gli stati di esportatore del flusso.

Python
client = StreamManagerClient() try: stream_description = client.describe_message_stream(stream_name="StreamName") if stream_description.export_statuses[0].error_message: # The last export of export destination 0 failed with some error # Here is the last sequence number that was successfully exported stream_description.export_statuses[0].last_exported_sequence_number if (stream_description.storage_status.newest_sequence_number > stream_description.export_statuses[0].last_exported_sequence_number): pass # The end of the stream is ahead of the last exported sequence number except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Riferimento SDK Python:describe_message_stream

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo description = client.describeMessageStream("StreamName"); String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage(); if (lastErrorMessage != null && !lastErrorMessage.equals("")) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.getExportStatuses().get(0).getLastExportedSequenceNumber(); } if (description.getStorageStatus().getNewestSequenceNumber() > description.getExportStatuses().get(0).getLastExportedSequenceNumber()) { // The end of the stream is ahead of the last exported sequence number. } } catch (StreamManagerException e) { // Properly handle exception. }

Riferimento SDK Java:describeMessageStream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const description = await client.describeMessageStream("StreamName"); const lastErrorMessage = description.exportStatuses[0].errorMessage; if (lastErrorMessage) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.exportStatuses[0].lastExportedSequenceNumber; } if (description.storageStatus.newestSequenceNumber > description.exportStatuses[0].lastExportedSequenceNumber) { // The end of the stream is ahead of the last exported sequence number. } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Riferimento SDK Node.js:describeMessageStream

 

Aggiornamento del flusso di messaggi

Aggiorna le proprietà di un flusso esistente. È possibile aggiornare un flusso se i requisiti cambiano dopo la creazione dello stream. Ad esempio:

  • Aggiungi un nuovoconfigurazione di esportazioneper unCloud AWSdestinazione.

  • Aumenta la dimensione massima di un flusso per modificare il modo in cui i dati vengono esportati o conservati. Ad esempio, la dimensione del flusso in combinazione con la strategia sulle impostazioni complete potrebbe comportare l'eliminazione o il rifiuto dei dati prima che lo stream manager possa elaborarli.

  • Metti in pausa e riprendi le esportazioni; ad esempio, se le attività di esportazione sono di lunga durata e desideri creare una razionazione dei dati di caricamento.

Le tue funzioni Lambda seguono questo processo di alto livello per aggiornare uno stream:

  1. Descrizione del flusso.

  2. Aggiorna le proprietà di destinazione sul corrispondenteMessageStreamDefinitione oggetti subordinati.

  3. Passa nella versione aggiornataMessageStreamDefinition. Assicurati di includere le definizioni complete degli oggetti per il flusso aggiornato. Le proprietà non definite ripristinano i valori predefiniti.

    È possibile specificare il numero di sequenza del messaggio da utilizzare come messaggio iniziale nell'esportazione.

Requisiti

Questa operazione ha i requisiti seguenti:

  • MinimoAWS IoT GreengrassVersione di Core: 1.11.0

  • MinimoAWS IoT GreengrassVersione SDK: Python: 1.6.0 | Java: 1.5.0 | Node.js: 1.7.0

Esempi

Il frammento di codice seguente aggiorna il flusso denominatoStreamName. Aggiorna più proprietà di un flusso che esporta in Kinesis Data Streams.

Python
client = StreamManagerClient() try: message_stream_info = client.describe_message_stream(STREAM_NAME) message_stream_info.definition.max_size=536870912 message_stream_info.definition.stream_segment_size=33554432 message_stream_info.definition.time_to_live_millis=3600000 message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData message_stream_info.definition.persistence=Persistence.Memory message_stream_info.definition.flush_on_write=False message_stream_info.definition.export_definition.kinesis= [KinesisConfig( # Updating Export definition to add a Kinesis Stream configuration. identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))] client.update_message_stream(message_stream_info.definition) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Riferimento SDK Python:Aggiorna Message Stream|MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME); // Update the message stream with new values. client.updateMessageStream( messageStreamInfo.getDefinition() .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912L) // Update Max Size to 512 MB. .withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB. .withFlushOnWrite(true) // Update flush on write to true. .withPersistence(Persistence.Memory) // Update the persistence to Memory. .withTimeToLiveMillis(3600000L) // Update TTL to 1 hour. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Cloud AWS. messageStreamInfo.getDefinition().getExportDefinition(). // Updating Export definition to add a Kinesis Stream configuration. .withKinesis(new ArrayList<KinesisConfig>() {{ add(new KinesisConfig() .withIdentifier(EXPORT_IDENTIFIER) .withKinesisStreamName("test")); }}) ); } catch (StreamManagerException e) { // Properly handle exception. }

Riferimento SDK Java:update_message_stream|MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messageStreamInfo = await c.describeMessageStream(STREAM_NAME); await client.updateMessageStream( messageStreamInfo.definition // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB. .withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB. .withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour. .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. .withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory .withFlushOnWrite(true) // Default is false. Updating to true. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Cloud AWS. messageStreamInfo.definition.exportDefinition // Updating Export definition to add a Kinesis Stream configuration. .withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())]) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Riferimento SDK Node.js:Aggiorna Message Stream|MessageStreamDefinition

Vincoli per l'aggiornamento dei flussi

I seguenti vincoli si applicano durante l'aggiornamento dei flussi. Se non indicato nel seguente elenco, gli aggiornamenti hanno effetto immediato.

  • Non è possibile aggiornare la persistenza di un flusso. Per modificare questo comportamento,Eliminazione del flussoecreare un flussoche definisce la nuova politica di persistenza.

  • È possibile aggiornare le dimensioni massime di un flusso solo alle seguenti condizioni:

    • La dimensione massima deve essere maggiore o uguale a quella corrente del flusso. Per trovare queste informazioni,describeil flussoe quindi controllare lo stato di archiviazione del prodotto restituitoMessageStreamInfooggetto.

    • La dimensione massima deve essere maggiore o uguale a quella del segmento del flusso.

  • È possibile aggiornare la dimensione del segmento di flusso con un valore inferiore alla dimensione massima del flusso. L'impostazione aggiornata si applica ai nuovi segmenti.

  • Gli aggiornamenti della proprietà time to live (TTL) si applicano alle nuove operazioni di accodamento. Se si riduce questo valore, stream manager potrebbe anche eliminare segmenti esistenti che superano il TTL.

  • Gli aggiornamenti della strategia sull'intera proprietà si applicano alle nuove operazioni di accodamento. Se si imposta la strategia per sovrascrivere i dati meno recenti, stream manager potrebbe anche sovrascrivere i segmenti esistenti in base alla nuova impostazione.

  • Gli aggiornamenti della proprietà flush on write si applicano ai nuovi messaggi.

  • Gli aggiornamenti delle configurazioni di esportazione si applicano alle nuove esportazioni. La richiesta di aggiornamento deve includere tutte le configurazioni di esportazione che si desidera supportare. In caso contrario, stream manager li elimina.

    • Quando si aggiorna una configurazione di esportazione, specificare l'identificatore della configurazione di esportazione di destinazione.

    • Per aggiungere una configurazione di esportazione, specificare un identificatore univoco per la nuova configurazione di esportazione.

    • Per eliminare una configurazione di esportazione, omettere la configurazione di esportazione.

  • Peraggiornareil numero di sequenza iniziale di una configurazione di esportazione in un flusso, è necessario specificare un valore inferiore all'ultimo numero di sequenza. Per trovare queste informazioni,describeil flussoe quindi controllare lo stato di archiviazione del prodotto restituitoMessageStreamInfooggetto.

 

Eliminazione del flusso di messaggi

Elimina un flusso. Quando si elimina un flusso, tutti i dati memorizzati per il flusso vengono eliminati dal disco.

Requisiti

Questa operazione ha i requisiti seguenti:

  • MinimoAWS IoT GreengrassVersione di Core: 1.10.0

  • MinimoAWS IoT GreengrassVersione SDK: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0

Esempi

Il frammento di codice seguente elimina il flusso denominato StreamName.

Python
client = StreamManagerClient() try: client.delete_message_stream(stream_name="StreamName") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Riferimento SDK Python:deleteMessageStream

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.deleteMessageStream("StreamName"); } catch (StreamManagerException e) { // Properly handle exception. }

Riferimento SDK Java:delete_message_stream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.deleteMessageStream("StreamName"); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Riferimento SDK Node.js:deleteMessageStream

Consultare anche