Utilizar StreamManagerClient para trabajar con secuencias - AWS IoT Greengrass

AWS IoT Greengrass Version 1 entró en la fase de vida útil prolongada el 30 de junio de 2023. Para obtener más información, consulte la política de mantenimiento de AWS IoT Greengrass V1 Después de esta fecha, AWS IoT Greengrass V1 no publicará actualizaciones que proporcionen características, mejoras, correcciones de errores o parches de seguridad. Los dispositivos que se ejecuten en AWS IoT Greengrass V1 no se verán afectados y seguirán funcionando y conectándose a la nube. Le recomendamos encarecidamente que migre a AWS IoT Greengrass Version 2, ya que añade importantes características nuevas y es compatible con más plataformas.

Utilizar StreamManagerClient para trabajar con secuencias

Las funciones de Lambda definidas por el usuario que se ejecutan en el núcleo de AWS IoT Greengrass pueden utilizar el objeto StreamManagerClient en el AWS IoT Greengrass Core SDK para crear secuencias en el Administrador de flujos y luego interactuar con ellas. Cuando una función de Lambda crea una secuencia, define los destinos de Nube de AWS, las prioridades y otras políticas de exportación y retención de datos para la secuencia. Para enviar datos al administrador de flujos, las funciones de Lambda anexan los datos al flujo. Si se define un destino de exportación para el flujo, el administrador de flujos exporta la secuencia automáticamente.

nota

Normalmente, los clientes del administrador de flujos son funciones de Lambda definidas por el usuario. Si su caso de negocio lo requiere, puede permitir que los procesos no-Lambda que se ejecutan en el núcleo de Greengrass (por ejemplo, un contenedor de Docker) interactúen con el administrador de flujos. Para obtener más información, consulte Autenticación del cliente.

Los fragmentos de este tema muestran cómo los clientes llaman a los métodos de StreamManagerClient para trabajar con secuencias. Para obtener detalles sobre la implementación de los métodos y sus argumentos, utilice los vínculos a la referencia del SDK de cada fragmento. Para ver tutoriales que utilizan una función de Lambda completa de Python, consulte Exportar flujos de datos a la Nube de AWS (consola) o Exportar flujos de datos a la Nube de AWS (CLI).

Debe crear una función de Lambda StreamManagerClient fuera del controlador de funciones. Si se crea una instancia en el controlador, la función crea un client y una conexión al administrador de secuencias cada vez que se invoca.

nota

Si crea una instancia StreamManagerClient en el controlador, debe llamar explícitamente al método close() cuando el client complete su trabajo. De lo contrario, el client mantiene la conexión abierta y otro subproceso en ejecución hasta que salga del script.

StreamManagerClient admite las siguientes operaciones:

Creación de una secuencia de mensajes

Para crear un flujo, una función de Lambda definida por el usuario llama al método de creación y pasa un objeto de MessageStreamDefinition. Este objeto especifica el nombre exclusivo del flujo y define cómo el administrador del flujo debe gestionar los nuevos datos cuando se alcanza el tamaño máximo de flujo. Puede utilizar MessageStreamDefinition y sus tipos de datos (como ExportDefinition, StrategyOnFull y Persistence) para definir otras propiedades de secuencias. Entre ellos se incluyen:

  • El destino AWS IoT Analytics, Kinesis Data Streams, AWS IoT SiteWise y Amazon S3 de destino para las exportaciones automáticas. Para obtener más información, consulte Exportación de configuraciones para destinos de Nube de AWS compatibles.

  • Prioridad de exportación. El administrador de secuencias exporta secuencias de mayor prioridad antes que secuencias de menor prioridad.

  • Tamaño e intervalo de lote máximos para AWS IoT Analytics, Kinesis Data Streams y destinos AWS IoT SiteWise. El administrador de secuencias exporta mensajes cuando se cumple cualquiera de las condiciones.

  • Time-to-Live (TTL). La cantidad de tiempo para garantizar que los datos de secuencia están disponibles para su procesamiento. Debe asegurarse de que los datos se pueden consumir dentro de este período de tiempo. Esta no es una política de eliminación. Es posible que los datos no se eliminen inmediatamente después del período TTL.

  • Persistencia de secuencia. Elija guardar las secuencias en el sistema de archivos para conservar los datos en los reinicios del núcleo o guardar las secuencias en la memoria.

  • Inicio del número de secuencia Especifique el número de secuencia del mensaje que se utilizará como mensaje de inicio en la exportación.

Para obtener más información acerca de MessageStreamDefinition, consulte la referencia del SDK para el lenguaje de destino:

nota

StreamManagerClient también proporciona un destino que puede utilizar para exportar secuencias a un servidor HTTP. Este destino está pensado solo con fines de prueba. No es estable y no se admite para su uso en entornos de producción.

Una vez creado un flujo, las funciones de Lambda pueden anexar mensajes al flujo para enviar datos para su exportación y leer los mensajes del flujo para su procesamiento local. El número de secuencias que cree depende de sus capacidades de hardware y de su caso de negocio. Una estrategia es crear una secuencia para cada canal de destino en AWS IoT Analytics o en flujos de datos de Kinesis, aunque puede definir varios destinos para una secuencia. Una secuencia tiene una vida útil duradera.

Requisitos

Esta operación tiene los siguientes requisitos:

  • Versión mínima de AWS IoT Greengrass Core: 1.10.0

  • Versión mínima del SDK de AWS IoT Greengrass Core: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0

nota

La creación de flujos con un destino de exportación AWS IoT SiteWise o Amazon S3 tiene los siguientes requisitos:

  • Versión mínima de AWS IoT Greengrass Core: 1.11.0

  • Versión mínima del SDK de AWS IoT Greengrass Core: Python: 1.6.0 | Java: 1.5.0 | Node.js: 1.7.0

Ejemplos

El siguiente fragmento crea una secuencia denominada StreamName. Define las propiedades de las secuencias en MessageStreamDefinition y los tipos de datos subordinados.

Python
client = StreamManagerClient() try: client.create_message_stream(MessageStreamDefinition( name="StreamName", # Required. max_size=268435456, # Default is 256 MB. stream_segment_size=16777216, # Default is 16 MB. time_to_live_millis=None, # By default, no TTL is enabled. strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required. persistence=Persistence.File, # Default is File. flush_on_write=False, # Default is false. export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the Nube de AWS. kinesis=None, iot_analytics=None, iot_sitewise=None, s3_task_executor=None ) )) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Referencia del SDK de Python: create_message_stream | MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456L) // Default is 256 MB. .withStreamSegmentSize(16777216L) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Nube de AWS. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSitewise(null) .withS3TaskExecutor(null) ) ); } catch (StreamManagerException e) { // Properly handle exception. }

Referencia de SDK de Java: createMessageStream | MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456) // Default is 256 MB. .withStreamSegmentSize(16777216) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Nube de AWS. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSitewise(null) .withS3TaskExecutor(null) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Referencia de SDK de Node.js: createMessageStream | MessageStreamDefinition

Para obtener más información acerca de la configuración de destinos de exportación, consulte Exportación de configuraciones para destinos de Nube de AWS compatibles.

 

Agregar un mensaje

Para enviar datos al administrador de flujos para su exportación, las funciones de Lambda anexan los datos al flujo de destino. El destino de exportación determina el tipo de datos que se van a transferir a este método.

Requisitos

Esta operación tiene los siguientes requisitos:

  • Versión mínima de AWS IoT Greengrass Core: 1.10.0

  • Versión mínima del SDK de AWS IoT Greengrass Core: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0

nota

La adición de mensajes a un destino de exportación AWS IoT SiteWise o Amazon S3 tiene los siguientes requisitos:

  • Versión mínima de AWS IoT Greengrass Core: 1.11.0

  • Versión mínima del SDK de AWS IoT Greengrass Core: Python: 1.6.0 | Java: 1.5.0 | Node.js: 1.7.0

Ejemplos

AWS IoT Analytics o destinos de exportación de Kinesis Data Streams

El siguiente fragmento añade un mensaje a la secuencia denominada StreamName. Para los destinos de AWS IoT Analytics o Kinesis Data Streams, las funciones de Lambda anexan una masa de datos.

Este fragmento tiene los siguientes requisitos:

  • Versión mínima de AWS IoT Greengrass Core: 1.10.0

  • Versión mínima del SDK de AWS IoT Greengrass Core: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0

Python
client = StreamManagerClient() try: sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data') except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Referencia del SDK de Python: append_message

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes()); } catch (StreamManagerException e) { // Properly handle exception. }

Referencia de SDK de Java: appendMessage

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array")); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Referencia del Node.js SDK: appendMessage

AWS IoT SiteWise destinos de exportación

El siguiente fragmento añade un mensaje a la secuencia denominada StreamName. Para destinos AWS IoT SiteWise, las funciones de Lambda anexan un objeto serializado PutAssetPropertyValueEntry. Para obtener más información, consulte Exportación a AWS IoT SiteWise.

nota

Cuando envía datos a AWS IoT SiteWise, los datos deben cumplir todos los requisitos de la acción BatchPutAssetPropertyValue. Para obtener más información, consulte BatchPutAssetPropertyValue en la Referencia de la API AWS IoT SiteWise.

Este fragmento tiene los siguientes requisitos:

  • Versión mínima de AWS IoT Greengrass Core: 1.11.0

  • Versión mínima del SDK de AWS IoT Greengrass Core: Python: 1.6.0 | Java: 1.5.0 | Node.js: 1.7.0

Python
client = StreamManagerClient() try: # SiteWise requires unique timestamps in all messages. Add some randomness to time and offset. # Note: To create a new asset property data, you should use the classes defined in the # greengrasssdk.stream_manager module. time_in_nanos = TimeInNanos( time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) ) variant = Variant(double_value=random.random()) asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset) sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Referencia del SDK de Python: append_message | PutAssetPropertyValueEntry

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { Random rand = new Random(); // Note: To create a new asset property data, you should use the classes defined in the // com.amazonaws.greengrass.streammanager.model.sitewise package. List<AssetPropertyValue> entries = new ArrayList<>() ; // IoTSiteWise requires unique timestamps in all messages. Add some randomness to time and offset. final int maxTimeRandomness = 60; final int maxOffsetRandomness = 10000; double randomValue = rand.nextDouble(); TimeInNanos timestamp = new TimeInNanos() .withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness)) .withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness))); AssetPropertyValue entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); entries.add(entry); PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(UUID.randomUUID().toString()) .withPropertyAlias("PropertyAlias") .withPropertyValues(entries); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (StreamManagerException e) { // Properly handle exception. }

Referencia del SDK de Java: appendMessage | PutAssetPropertyValueEntry

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const maxTimeRandomness = 60; const maxOffsetRandomness = 10000; const randomValue = Math.random(); // Note: To create a new asset property data, you should use the classes defined in the // aws-greengrass-core-sdk StreamManager module. const timestamp = new TimeInNanos() .withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness)) .withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness)); const entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(`${ENTRY_ID_PREFIX}${i}`) .withPropertyAlias("PropertyAlias") .withPropertyValues([entry]); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Referencia del SDK de Node.js: appendMessage | PutAssetPropertyValueEntry

Destinos de exportación de Amazon S3

El siguiente fragmento añade una tarea de exportación a la secuencia denominada StreamName. Para los destinos de Amazon S3, las funciones de Lambda añaden un objeto S3ExportTaskDefinition serializado que contiene información sobre el archivo de entrada de origen y el objeto de Amazon S3 de destino. Si el objeto especificado no existe, el administrador de flujos lo crea por usted. Para obtener más información, consulte Exportación a Amazon S3.

Este fragmento tiene los siguientes requisitos:

  • Versión mínima de AWS IoT Greengrass Core: 1.11.0

  • Versión mínima del SDK de AWS IoT Greengrass Core: Python: 1.6.0 | Java: 1.5.0 | Node.js: 1.7.0

Python
client = StreamManagerClient() try: # Append an Amazon S3 Task definition and print the sequence number. s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName") sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Referencia del SDK de Python: append_message | S3ExportTaskDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { // Append an Amazon S3 export task definition and print the sequence number. S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition)); } catch (StreamManagerException e) { // Properly handle exception. }

Referencia del SDK de Java: appendMessage | S3ExportTaskDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { // Append an Amazon S3 export task definition and print the sequence number. const taskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition))); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Referencia del SDK de Node.js: appendMessage | S3ExportTaskDefinition

 

Lectura de mensajes

Leer mensajes de un flujo.

Requisitos

Esta operación tiene los siguientes requisitos:

  • Versión mínima de AWS IoT Greengrass Core: 1.10.0

  • Versión mínima del SDK de AWS IoT Greengrass Core: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0

Ejemplos

El siguiente fragmento lee los mensajes de la secuencia denominada StreamName. El método read toma un objeto ReadMessagesOptions opcional que especifica el número de secuencia para comenzar a leer, los números mínimo y máximo para leer y un tiempo de espera para leer mensajes.

Python
client = StreamManagerClient() try: message_list = client.read_messages( stream_name="StreamName", # By default, if no options are specified, it tries to read one message from the beginning of the stream. options=ReadMessagesOptions( desired_start_sequence_number=100, # Try to read from sequence number 100 or greater. By default, this is 0. min_message_count=10, # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. max_message_count=100, # Accept up to 100 messages. By default this is 1. read_timeout_millis=5000 # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. ) ) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Referencia del SDK de Python: read_messages | ReadMessagesOptions

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<Message> messages = client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100L) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. .withMinMessageCount(10L) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100L) // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis()) ); } catch (StreamManagerException e) { // Properly handle exception. }

Referencia de SDK de Java: readMessages | ReadMessagesOptions

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messages = await client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1. .withMinMessageCount(10) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100) // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(5 * 1000) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Referencia del SDK deNode.js: readMessages | ReadMessagesOptions

 

Lista de secuencias

Obtenga la lista de flujos en el administrador de flujos.

Requisitos

Esta operación tiene los siguientes requisitos:

  • Versión mínima de AWS IoT Greengrass Core: 1.10.0

  • Versión mínima del SDK de AWS IoT Greengrass Core: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0

Ejemplos

El siguiente fragmento de código obtiene una lista de las secuencias (por nombre) del administrador de secuencias.

Python
client = StreamManagerClient() try: stream_names = client.list_streams() except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Referencia del SDK de Python: list_streams

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<String> streamNames = client.listStreams(); } catch (StreamManagerException e) { // Properly handle exception. }

Referencia de SDK de Java: listStreams

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const streams = await client.listStreams(); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Referencia del Node.js SDK: listStreams

 

Descripción de una secuencia de mensajes

Obtenga metadatos sobre un flujo, incluida la definición, el tamaño y el estado de exportación del flujo.

Requisitos

Esta operación tiene los siguientes requisitos:

  • Versión mínima de AWS IoT Greengrass Core: 1.10.0

  • Versión mínima del SDK de AWS IoT Greengrass Core: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0

Ejemplos

El siguiente fragmento de código obtiene metadatos de una secuencia llamada StreamName, como su definición, su tamaño y los estados del exportador.

Python
client = StreamManagerClient() try: stream_description = client.describe_message_stream(stream_name="StreamName") if stream_description.export_statuses[0].error_message: # The last export of export destination 0 failed with some error # Here is the last sequence number that was successfully exported stream_description.export_statuses[0].last_exported_sequence_number if (stream_description.storage_status.newest_sequence_number > stream_description.export_statuses[0].last_exported_sequence_number): pass # The end of the stream is ahead of the last exported sequence number except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Referencia del SDK de Python: describe_message_stream

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo description = client.describeMessageStream("StreamName"); String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage(); if (lastErrorMessage != null && !lastErrorMessage.equals("")) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.getExportStatuses().get(0).getLastExportedSequenceNumber(); } if (description.getStorageStatus().getNewestSequenceNumber() > description.getExportStatuses().get(0).getLastExportedSequenceNumber()) { // The end of the stream is ahead of the last exported sequence number. } } catch (StreamManagerException e) { // Properly handle exception. }

Referencia del SDK de Java: describeMessageStream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const description = await client.describeMessageStream("StreamName"); const lastErrorMessage = description.exportStatuses[0].errorMessage; if (lastErrorMessage) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.exportStatuses[0].lastExportedSequenceNumber; } if (description.storageStatus.newestSequenceNumber > description.exportStatuses[0].lastExportedSequenceNumber) { // The end of the stream is ahead of the last exported sequence number. } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Referencia del Node.js SDK: describeMessageStream

 

Actualización de un flujo de mensajes

Actualiza las propiedades de un flujo existente. Es posible que desee actualizar un flujo si sus requisitos cambian después de crearlo. Por ejemplo:

  • Agregue una nueva configuración de exportación para un destino Nube de AWS.

  • Aumente el tamaño máximo de un flujo para cambiar la forma en que se exportan o conservan los datos. Por ejemplo, el tamaño del flujo, en combinación con su estrategia en la configuración completa, puede provocar que los datos se eliminen o rechacen antes de que el administrador de flujos pueda procesarlos.

  • Pausa y reanuda las exportaciones; por ejemplo, si las tareas de exportación son prolongadas y quiere racionar los datos de carga.

Las funciones de Lambda siguen este proceso de alto nivel para actualizar un flujo:

  1. Consiga la descripción del flujo.

  2. Actualice las propiedades de destino de los objetos correspondientes MessageStreamDefinition y subordinados.

  3. Transfiera la actualización MessageStreamDefinition. Asegúrese de incluir las definiciones de objetos completas para el flujo actualizado. Las propiedades no definidas revierten a los valores predeterminados.

    Puede especificar el número de secuencia del mensaje que se utilizará como mensaje de inicio en la exportación.

Requisitos

Esta operación tiene los siguientes requisitos:

  • Versión mínima de AWS IoT Greengrass Core: 1.11.0

  • Versión mínima del SDK de AWS IoT Greengrass Core: Python: 1.6.0 | Java: 1.5.0 | Node.js: 1.7.0

Ejemplos

El siguiente fragmento de código actualiza la secuencia llamada StreamName. Actualiza varias propiedades de un flujo que se exporta a Kinesis Data Streams.

Python
client = StreamManagerClient() try: message_stream_info = client.describe_message_stream(STREAM_NAME) message_stream_info.definition.max_size=536870912 message_stream_info.definition.stream_segment_size=33554432 message_stream_info.definition.time_to_live_millis=3600000 message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData message_stream_info.definition.persistence=Persistence.Memory message_stream_info.definition.flush_on_write=False message_stream_info.definition.export_definition.kinesis= [KinesisConfig( # Updating Export definition to add a Kinesis Stream configuration. identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))] client.update_message_stream(message_stream_info.definition) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Referencia del SDK de Python: updateMessageStream | MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME); // Update the message stream with new values. client.updateMessageStream( messageStreamInfo.getDefinition() .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912L) // Update Max Size to 512 MB. .withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB. .withFlushOnWrite(true) // Update flush on write to true. .withPersistence(Persistence.Memory) // Update the persistence to Memory. .withTimeToLiveMillis(3600000L) // Update TTL to 1 hour. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Nube de AWS. messageStreamInfo.getDefinition().getExportDefinition(). // Updating Export definition to add a Kinesis Stream configuration. .withKinesis(new ArrayList<KinesisConfig>() {{ add(new KinesisConfig() .withIdentifier(EXPORT_IDENTIFIER) .withKinesisStreamName("test")); }}) ); } catch (StreamManagerException e) { // Properly handle exception. }

Referencia del SDK de Java: create_message_stream | MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messageStreamInfo = await c.describeMessageStream(STREAM_NAME); await client.updateMessageStream( messageStreamInfo.definition // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB. .withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB. .withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour. .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. .withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory .withFlushOnWrite(true) // Default is false. Updating to true. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Nube de AWS. messageStreamInfo.definition.exportDefinition // Updating Export definition to add a Kinesis Stream configuration. .withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())]) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Referencia del SDK de Node.js: updateMessageStream | MessageStreamDefinition

Restricciones para actualizar los flujos

Se aplican las siguientes restricciones al actualizar flujos. A menos que se indique en la siguiente lista, las actualizaciones se aplican de inmediato.

  • No puede actualizar la persistencia de un flujo. Para cambiar este comportamiento, elimine el flujo y crea un flujo que defina la nueva política de persistencia.

  • Puede actualizar el tamaño máximo de una transmisión solo en las siguientes condiciones:

    • El tamaño máximo debe ser superior o igual al tamaño actual del flujo. Para encontrar esta información, describa la secuencia y, a continuación, compruebe el estado de almacenamiento del objeto MessageStreamInfo devuelto.

    • El tamaño máximo debe ser superior o igual al tamaño del segmento del flujo.

  • Puede actualizar el tamaño del segmento de la transmisión a un valor inferior al tamaño máximo del flujo. La configuración actualizada se aplica a los segmentos nuevos.

  • Las actualizaciones de la propiedad tiempo de vida (TTL) se aplican a las nuevas operaciones de anexión. Si reduce este valor, es posible que el administrador de flujos también elimine los segmentos existentes que superen el TTL.

  • Las actualizaciones de la estrategia en toda la propiedad se aplican a las nuevas operaciones de anexión. Si establece la estrategia para sobrescribir los flujos de datos más antiguos, es posible que el administrador de flujos también sobrescriba los segmentos existentes en función del nuevo ajuste.

  • Las actualizaciones de la propiedad de vaciar al escribir se aplican a los mensajes nuevos.

  • Las actualizaciones de las configuraciones de exportación se aplican a las nuevas exportaciones. La solicitud de actualización debe incluir todas las configuraciones de exportación que desee admitir. De lo contrario, el administrador de flujos las elimina.

    • Al actualizar una configuración de exportación, especifique el identificador de la configuración de exportación de destino.

    • Para añadir una configuración de exportación, especifique un identificador único para la nueva configuración de exportación.

    • Para eliminar una configuración de exportación, omita la configuración de exportación.

  • Para actualizar el número de secuencia inicial de una configuración de exportación en un flujo, debe especificar un valor inferior al último número de secuencia. Para encontrar esta información, describa la secuencia y, a continuación, compruebe el estado de almacenamiento del objeto MessageStreamInfo devuelto.

 

Eliminación de una secuencia de mensajes

Elimina un flujo. Cuando elimina una secuencia, todos los datos almacenados para la secuencia se eliminan del disco.

Requisitos

Esta operación tiene los siguientes requisitos:

  • Versión mínima de AWS IoT Greengrass Core: 1.10.0

  • Versión mínima del SDK de AWS IoT Greengrass Core: Python: 1.5.0 | Java: 1.4.0 | Node.js: 1.6.0

Ejemplos

El siguiente fragmento de código elimina la secuencia llamada StreamName.

Python
client = StreamManagerClient() try: client.delete_message_stream(stream_name="StreamName") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Referencia del SDK de Python: deleteMessageStream

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.deleteMessageStream("StreamName"); } catch (StreamManagerException e) { // Properly handle exception. }

Referencia del SDK de Java: delete_message_stream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.deleteMessageStream("StreamName"); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Referencia del SDK de Node.js: deleteMessageStream

Véase también