Use StreamManagerClient para trabalhar com streams - AWS IoT Greengrass

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Use StreamManagerClient para trabalhar com streams

Os componentes do Greengrass definidos pelo usuário que são executados no dispositivo principal do Greengrass podem usar StreamManagerClient o objeto no SDK do Stream Manager para criar fluxos no gerenciador de fluxos e depois interagir com os fluxos. Quando um componente cria um fluxo, ele define os Nuvem AWS destinos, a priorização e outras políticas de exportação e retenção de dados para o fluxo. Para enviar dados ao gerenciador de fluxo, os componentes anexam os dados ao fluxo. Se um destino de exportação for definido para o fluxo, o gerenciador de fluxo exportará o fluxo automaticamente.

nota

Normalmente, os clientes do stream manager são componentes do Greengrass definidos pelo usuário. Se seu caso de negócios exigir isso, você também pode permitir que processos sem componentes executados no núcleo do Greengrass (por exemplo, um contêiner Docker) interajam com o gerenciador de stream. Para ter mais informações, consulte Autenticação de cliente.

Os snippets neste tópico mostram como os clientes chamam o StreamManagerClient para trabalhar com fluxos. Para obter detalhes de implementação sobre os métodos e seus argumentos, use os links para a referência do SDK listada após cada snippet.

Se você usa o gerenciador de stream em uma função do Lambda, sua função do Lambda deve ser instanciada StreamManagerClient fora do manipulador da função. Se instanciado no manipulador, a função cria um client e uma conexão para o gerenciador de fluxo sempre que for invocado.

nota

Se você instanciar StreamManagerClient no manipulador, você deve chamar explicitamente o método close() quando o client concluir seu trabalho. Caso contrário, o client mantém a conexão aberta e outro thread em execução até que o script seja encerrado.

StreamManagerClient comporta as operações a seguir:

Criar stream de mensagens

Para criar um stream, um componente do Greengrass definido pelo usuário chama o método create e passa um objeto. MessageStreamDefinition Esse objeto especifica o nome exclusivo do fluxo e define como o gerenciador de fluxo deve lidar com novos dados quando o tamanho máximo do fluxo for atingido. Você pode usar MessageStreamDefinition e os tipos de dados (como ExportDefinition, StrategyOnFull e Persistence) para definir outras propriedades de fluxo. Isso inclui:

  • O destino AWS IoT Analytics, o Kinesis Data Streams, o AWS IoT SiteWise e os destinos do Amazon S3, para exportações automáticas. Para ter mais informações, consulte Configurações de exportação para destinos compatíveis do Nuvem AWS.

  • Prioridade da exportação. O gerenciador de fluxo exporta fluxos de prioridade mais alta antes de fluxos de prioridade mais baixa.

  • Tamanho máximo do lote e intervalo de lote para AWS IoT Analytics Kinesis Data Streams e destinos do AWS IoT SiteWise. O gerenciador de fluxo exporta mensagens quando qualquer condição é atendida.

  • T ime-to-live (TTL). O tempo necessário para garantir que os dados de fluxo estejam disponíveis para processamento. Você deve certificar-se de que os dados podem ser consumidos nesse período de tempo. Esta não é uma política de exclusão. Os dados podem não ser excluídos imediatamente após o período de TTL.

  • Persistência do fluxo. Selecione salvar fluxos no sistema de arquivos para persistir os dados nas reinicializações do núcleo ou salve os fluxos na memória.

  • Número de sequência inicial. Especifique o número de sequência da mensagem a ser usada como mensagem inicial na exportação.

Para obter mais informações sobre MessageStreamDefinition, consulte a referência do SDK para a sua linguagem de destino:

nota

O StreamManagerClient também fornece um destino alvo que você pode usar para exportar fluxos para um servidor HTTP. Este destino deve ser usado apenas para fins de teste. Ele não é estável e nem compatível para uso em ambientes de produção.

Depois que um stream é criado, seus componentes do Greengrass podem anexar mensagens ao stream para enviar dados para exportação e ler mensagens do stream para processamento local. O número de fluxos criados depende dos seus recursos de hardware e caso de negócios. Uma estratégia é criar um fluxo para cada canal de destino no AWS IoT Analytics ou no fluxo de dados do Kinesis, embora você possa definir vários destinos para um fluxo. Um fluxo tem longa duração.

Requisitos

Essa operação tem os seguintes requisitos:

  • Versão mínima do SDK do Stream Manager: Python: 1.1.0 | Java: 1.1.0 | Node.js: 1.1.0

Exemplos

O snippet a seguir cria um fluxo chamado StreamName. Ele define as propriedades de fluxo em MessageStreamDefinition e nos tipos de dados subordinados.

Python
client = StreamManagerClient() try: client.create_message_stream(MessageStreamDefinition( name="StreamName", # Required. max_size=268435456, # Default is 256 MB. stream_segment_size=16777216, # Default is 16 MB. time_to_live_millis=None, # By default, no TTL is enabled. strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required. persistence=Persistence.File, # Default is File. flush_on_write=False, # Default is false. export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the Nuvem AWS. kinesis=None, iot_analytics=None, iot_sitewise=None, s3_task_executor=None ) )) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Referência do SDK do Python: create_message_stream | MessageStreamDefinition

Java
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) { client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456L) // Default is 256 MB. .withStreamSegmentSize(16777216L) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Nuvem AWS. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSitewise(null) .withS3(null) ) ); } catch (StreamManagerException e) { // Properly handle exception. }

Referência do SDK Java: | createMessageStreamMessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456) // Default is 256 MB. .withStreamSegmentSize(16777216) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Nuvem AWS. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSiteWise(null) .withS3(null) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Referência do SDK do Node.js: | createMessageStreamMessageStreamDefinition

Para obter mais informações sobre como configurar destinos de exportação, consulte Configurações de exportação para destinos compatíveis do Nuvem AWS.

Anexar mensagem

Para enviar dados ao stream manager para exportação, seus componentes do Greengrass anexam os dados ao stream de destino. O destino da exportação determina o tipo de dados a ser passado para esse método.

Requisitos

Essa operação tem os seguintes requisitos:

  • Versão mínima do SDK do Stream Manager: Python: 1.1.0 | Java: 1.1.0 | Node.js: 1.1.0

Exemplos

Destinos de exportação do AWS IoT Analytics ou do Kinesis Data Streams

O snippet a seguir anexa uma mensagem ao fluxo chamado StreamName. Para AWS IoT Analytics nossos destinos do Kinesis Data Streams, seus componentes do Greengrass acrescentam um blob de dados.

Esse snippet tem os seguintes requisitos:

  • Versão mínima do SDK do Stream Manager: Python: 1.1.0 | Java: 1.1.0 | Node.js: 1.1.0

Python
client = StreamManagerClient() try: sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data') except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Referência do SDK em Python: append_message

Java
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) { long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes()); } catch (StreamManagerException e) { // Properly handle exception. }

Referência do SDK em Java: appendMessage

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array")); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Referência do SDK em Node.js: appendMessage

Destinos de exportação do AWS IoT SiteWise

O snippet a seguir anexa uma mensagem ao fluxo chamado StreamName. Para AWS IoT SiteWise destinos, seus componentes do Greengrass acrescentam um objeto serializado. PutAssetPropertyValueEntry Para ter mais informações, consulte Exportando para o AWS IoT SiteWise.

nota

Ao enviar dados para o AWS IoT SiteWise, os dados devem atender aos requisitos da ação BatchPutAssetPropertyValue. Para obter mais informações, consulte BatchPutAssetPropertyValue na Referência da API do AWS IoT SiteWise.

Esse snippet tem os seguintes requisitos:

  • Versão mínima do SDK do Stream Manager: Python: 1.1.0 | Java: 1.1.0 | Node.js: 1.1.0

Python
client = StreamManagerClient() try: # SiteWise requires unique timestamps in all messages and also needs timestamps not earlier # than 10 minutes in the past. Add some randomness to time and offset. # Note: To create a new asset property data, you should use the classes defined in the # greengrasssdk.stream_manager module. time_in_nanos = TimeInNanos( time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) ) variant = Variant(double_value=random.random()) asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset) sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Referência do SDK do Python: append_message | PutAssetPropertyValueEntry

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { Random rand = new Random(); // Note: To create a new asset property data, you should use the classes defined in the // com.amazonaws.greengrass.streammanager.model.sitewise package. List<AssetPropertyValue> entries = new ArrayList<>() ; // IoTSiteWise requires unique timestamps in all messages and also needs timestamps not earlier // than 10 minutes in the past. Add some randomness to time and offset. final int maxTimeRandomness = 60; final int maxOffsetRandomness = 10000; double randomValue = rand.nextDouble(); TimeInNanos timestamp = new TimeInNanos() .withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness)) .withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness))); AssetPropertyValue entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); entries.add(entry); PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(UUID.randomUUID().toString()) .withPropertyAlias("PropertyAlias") .withPropertyValues(entries); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (StreamManagerException e) { // Properly handle exception. }

Referência do SDK Java: appendMessage | PutAssetPropertyValueEntry

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const maxTimeRandomness = 60; const maxOffsetRandomness = 10000; const randomValue = Math.random(); // Note: To create a new asset property data, you should use the classes defined in the // aws-greengrass-core-sdk StreamManager module. const timestamp = new TimeInNanos() .withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness)) .withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness)); const entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(`${ENTRY_ID_PREFIX}${i}`) .withPropertyAlias("PropertyAlias") .withPropertyValues([entry]); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Referência do SDK do Node.js: appendMessage | PutAssetPropertyValueEntry

Destinos de exportação do Amazon S3

O snippet a seguir anexa uma tarefa de exportação ao fluxo chamada StreamName. Para destinos do Amazon S3, seus componentes do Greengrass acrescentam um objeto serializado que contém informações sobre o arquivo de entrada de origem e o S3ExportTaskDefinition objeto Amazon S3 de destino. Se o objeto especificado não existir, o gerenciador de fluxo criará o objeto para você. Para ter mais informações, consulte Exportar para o Amazon S3.

Esse snippet tem os seguintes requisitos:

  • Versão mínima do SDK do Stream Manager: Python: 1.1.0 | Java: 1.1.0 | Node.js: 1.1.0

Python
client = StreamManagerClient() try: # Append an Amazon S3 Task definition and print the sequence number. s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName") sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Referência do SDK do Python: append_message | S3 ExportTaskDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { // Append an Amazon S3 export task definition and print the sequence number. S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition)); } catch (StreamManagerException e) { // Properly handle exception. }

Referência do SDK Java: appendMessage | S3 ExportTaskDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { // Append an Amazon S3 export task definition and print the sequence number. const taskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition))); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Referência do SDK do Node.js: appendMessage | S3 ExportTaskDefinition

Ler Mensagens

Ler mensagens de um fluxo.

Requisitos

Essa operação tem os seguintes requisitos:

  • Versão mínima do SDK do Stream Manager: Python: 1.1.0 | Java: 1.1.0 | Node.js: 1.1.0

Exemplos

O snippet a seguir lê mensagens do fluxo chamado StreamName. O método de leitura usa um objeto ReadMessagesOptions opcional que especifica o número de sequência a partir do qual começar a ler, os números mínimo e máximo a ler e um tempo limite para ler mensagens.

Python
client = StreamManagerClient() try: message_list = client.read_messages( stream_name="StreamName", # By default, if no options are specified, it tries to read one message from the beginning of the stream. options=ReadMessagesOptions( desired_start_sequence_number=100, # Try to read from sequence number 100 or greater. By default, this is 0. min_message_count=10, # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. max_message_count=100, # Accept up to 100 messages. By default this is 1. read_timeout_millis=5000 # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. ) ) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Referência do SDK do Python: read_messages | ReadMessagesOptions

Java
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) { List<Message> messages = client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100L) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. .withMinMessageCount(10L) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100L) // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis()) ); } catch (StreamManagerException e) { // Properly handle exception. }

Referência do SDK Java: readMessages | ReadMessagesOptions

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messages = await client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1. .withMinMessageCount(10) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100) // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(5 * 1000) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Referência do SDK do Node.js: readMessages | ReadMessagesOptions

Listar streams

Obtenha a lista de fluxos no gerenciador de fluxos.

Requisitos

Essa operação tem os seguintes requisitos:

  • Versão mínima do SDK do Stream Manager: Python: 1.1.0 | Java: 1.1.0 | Node.js: 1.1.0

Exemplos

O snippet a seguir obtém uma lista dos fluxos (por nome) no gerenciador de fluxo.

Python
client = StreamManagerClient() try: stream_names = client.list_streams() except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Referência do SDK em Python: list_streams

Java
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) { List<String> streamNames = client.listStreams(); } catch (StreamManagerException e) { // Properly handle exception. }

Referência do SDK em Java: ListStreams

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const streams = await client.listStreams(); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Referência do SDK em Node.js: listStreams

Descrever stream de mensagens

Obtenha metadados sobre um fluxo, incluindo a definição, o tamanho e o status de exportação do fluxo.

Requisitos

Essa operação tem os seguintes requisitos:

  • Versão mínima do SDK do Stream Manager: Python: 1.1.0 | Java: 1.1.0 | Node.js: 1.1.0

Exemplos

O snippet a seguir obtém metadados sobre o fluxo chamado StreamName, incluindo a definição, o tamanho e o status do exportador do fluxo.

Python
client = StreamManagerClient() try: stream_description = client.describe_message_stream(stream_name="StreamName") if stream_description.export_statuses[0].error_message: # The last export of export destination 0 failed with some error # Here is the last sequence number that was successfully exported stream_description.export_statuses[0].last_exported_sequence_number if (stream_description.storage_status.newest_sequence_number > stream_description.export_statuses[0].last_exported_sequence_number): pass # The end of the stream is ahead of the last exported sequence number except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Referência do SDK em Python: describe_message_stream

Java
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) { MessageStreamInfo description = client.describeMessageStream("StreamName"); String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage(); if (lastErrorMessage != null && !lastErrorMessage.equals("")) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.getExportStatuses().get(0).getLastExportedSequenceNumber(); } if (description.getStorageStatus().getNewestSequenceNumber() > description.getExportStatuses().get(0).getLastExportedSequenceNumber()) { // The end of the stream is ahead of the last exported sequence number. } } catch (StreamManagerException e) { // Properly handle exception. }

Referência do Java SDK: describeMessageStream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const description = await client.describeMessageStream("StreamName"); const lastErrorMessage = description.exportStatuses[0].errorMessage; if (lastErrorMessage) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.exportStatuses[0].lastExportedSequenceNumber; } if (description.storageStatus.newestSequenceNumber > description.exportStatuses[0].lastExportedSequenceNumber) { // The end of the stream is ahead of the last exported sequence number. } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Referência do SDK do Node.js: describeMessageStream

Atualize o fluxo de mensagens

Atualize as propriedades de um fluxo existente. Talvez você queira atualizar um fluxo se seus requisitos mudarem após a criação do fluxo. Por exemplo: .

  • Adicione uma nova configuração de exportação para um destino Nuvem AWS.

  • Aumente o tamanho máximo de um fluxo para alterar a forma como os dados são exportados ou retidos. Por exemplo, o tamanho do fluxo em combinação com sua estratégia em configurações completas pode resultar na exclusão ou rejeição dos dados antes que o gerenciador de fluxo possa processá-los.

  • Pause e retome as exportações; por exemplo, se as tarefas de exportação forem demoradas e você quiser racionar seus dados de upload.

Seus componentes do Greengrass seguem esse processo de alto nível para atualizar um stream:

  1. Obter a descrição do fluxo.

  2. Atualizar as propriedades de destino nos objetos correspondentes MessageStreamDefinition e subordinados.

  3. Passar o atualizado MessageStreamDefinition. Certifique-se de incluir as definições completas do objeto para o fluxo atualizado. As propriedades indefinidas revertem para os valores padrão.

    Você pode especificar o número de sequência da mensagem a ser usada como mensagem inicial na exportação.

Requisitos

Essa operação tem os seguintes requisitos:

  • Versão mínima do SDK do Stream Manager: Python: 1.1.0 | Java: 1.1.0 | Node.js: 1.1.0

Exemplos

O snippet a seguir atualiza o fluxo chamado StreamName. Ele atualiza várias propriedades de um fluxo que é exportado para o Kinesis Data Streams.

Python
client = StreamManagerClient() try: message_stream_info = client.describe_message_stream(STREAM_NAME) message_stream_info.definition.max_size=536870912 message_stream_info.definition.stream_segment_size=33554432 message_stream_info.definition.time_to_live_millis=3600000 message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData message_stream_info.definition.persistence=Persistence.Memory message_stream_info.definition.flush_on_write=False message_stream_info.definition.export_definition.kinesis= [KinesisConfig( # Updating Export definition to add a Kinesis Stream configuration. identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))] client.update_message_stream(message_stream_info.definition) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Referência do SDK do Python: | updateMessageStreamMessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME); // Update the message stream with new values. client.updateMessageStream( messageStreamInfo.getDefinition() .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912L) // Update Max Size to 512 MB. .withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB. .withFlushOnWrite(true) // Update flush on write to true. .withPersistence(Persistence.Memory) // Update the persistence to Memory. .withTimeToLiveMillis(3600000L) // Update TTL to 1 hour. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Nuvem AWS. messageStreamInfo.getDefinition().getExportDefinition(). // Updating Export definition to add a Kinesis Stream configuration. .withKinesis(new ArrayList<KinesisConfig>() {{ add(new KinesisConfig() .withIdentifier(EXPORT_IDENTIFIER) .withKinesisStreamName("test")); }}) ); } catch (StreamManagerException e) { // Properly handle exception. }

Referência do SDK Java: update_message_stream | MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messageStreamInfo = await c.describeMessageStream(STREAM_NAME); await client.updateMessageStream( messageStreamInfo.definition // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB. .withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB. .withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour. .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. .withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory .withFlushOnWrite(true) // Default is false. Updating to true. .withExportDefinition( // Optional. Choose where/how the stream is exported to the Nuvem AWS. messageStreamInfo.definition.exportDefinition // Updating Export definition to add a Kinesis Stream configuration. .withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())]) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Referência do SDK do Node.js: | updateMessageStreamMessageStreamDefinition

Restrições para a atualização de fluxos

As restrições a seguir se aplicam ao atualizar fluxos. A menos que indicado na lista a seguir, as atualizações entrarão em vigor imediatamente.

  • Não é possível atualizar a persistência de um fluxo. Para alterar esse comportamento, exclua o fluxo e crie um fluxo que defina a nova política de persistência.

  • Você só pode atualizar o tamanho máximo de um fluxo sob as seguintes condições:

    • O tamanho máximo deve ser maior que o tamanho atual do fluxo. Para encontrar essas informações, descreva o fluxo e, em seguida, verifique o status de armazenamento do objeto MessageStreamInfo retornado.

    • O tamanho máximo deve ser maior ou igual ao tamanho do segmento do fluxo.

  • Você pode atualizar o tamanho do segmento do fluxo para um valor menor que o tamanho máximo do fluxo. A configuração atualizada se aplica aos novos segmentos.

  • As atualizações da propriedade tempo de vida (TTL) se aplicam às novas operações de anexação. Se você diminuir esse valor, o gerenciador de fluxo também poderá excluir segmentos existentes que excedam o TTL.

  • As atualizações da estratégia em toda a propriedade se aplicam às novas operações de anexação. Se você definir a estratégia para substituir os dados mais antigos, o gerenciador de fluxo também poderá substituir os segmentos existentes com base na nova configuração.

  • As atualizações na propriedade “descartar após gravação” se aplicam às novas mensagens.

  • As atualizações nas configurações de exportação se aplicam às novas exportações. A solicitação de atualização deve incluir todas as configurações de exportação às quais você deseja oferecer suporte. Caso contrário, o gerenciador de fluxo as excluirá.

    • Ao atualizar uma configuração de exportação, especifique o identificador da configuração de exportação de destino.

    • Para adicionar uma configuração de exportação, especifique um identificador exclusivo para a nova configuração de exportação.

    • Para excluir uma configuração de exportação, omita a configuração de exportação.

  • Para atualizar o número da sequência inicial de uma configuração de exportação em um fluxo, você deve especificar um valor menor que o número de sequência mais recente. Para encontrar essas informações, descreva o fluxo e, em seguida, verifique o status de armazenamento do objeto MessageStreamInfo retornado.

Excluir stream de mensagens

Exclui um fluxo. Quando você exclui um fluxo, todos os dados armazenados para o fluxo são excluídos do disco.

Requisitos

Essa operação tem os seguintes requisitos:

  • Versão mínima do SDK do Stream Manager: Python: 1.1.0 | Java: 1.1.0 | Node.js: 1.1.0

Exemplos

O snippet a seguir exclui o fluxo chamado StreamName.

Python
client = StreamManagerClient() try: client.delete_message_stream(stream_name="StreamName") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Referência do SDK do Python: deleteMessageStream

Java
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) { client.deleteMessageStream("StreamName"); } catch (StreamManagerException e) { // Properly handle exception. }

Referência do SDK em Java: delete_message_stream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.deleteMessageStream("StreamName"); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Referência do SDK do Node.js: deleteMessageStream

Consulte também