As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Usar o StreamManagerClient para trabalhar com streams
Os componentes do Greengrass definidos pelo usuário em execução no dispositivo principal do Greengrass podem usar o objeto StreamManagerClient
no Stream Manager SDK para criar e interagir com os fluxos no Gerenciador de fluxos. Quando um componente cria um fluxo, ele define os destinos da Nuvem AWS, a priorização e outras políticas de exportação e de retenção de dados para o fluxo. Para enviar os dados para o Gerenciador de fluxos, os componentes anexam os dados ao fluxo. Se um destino de exportação for definido para o fluxo, o gerenciador de fluxo exportará o fluxo automaticamente.
Normalmente, os clientes do Gerenciador de fluxos são componentes do Greengrass definidos pelo usuário. Se o seu caso de negócios exigir, você também poderá permitir que os processos que não são de componente e são executados no núcleo do Greengrass (por exemplo, um contêiner do Docker) interajam com o Gerenciador de fluxos. Para ter mais informações, consulte Autenticação de cliente.
Os snippets neste tópico mostram como os clientes chamam o StreamManagerClient
para trabalhar com fluxos. Para obter detalhes de implementação sobre os métodos e seus argumentos, use os links para a referência do SDK listada após cada snippet.
Se você usa o Gerenciador de fluxos em uma função do Lambda, ela deve instanciar StreamManagerClient
fora do manipulador de função. Se instanciado no manipulador, a função cria um client
e uma conexão para o gerenciador de fluxo sempre que for invocado.
Se você instanciar StreamManagerClient
no manipulador, você deve chamar explicitamente o método close()
quando o client
concluir seu trabalho. Caso contrário, o client
mantém a conexão aberta e outro thread em execução até que o script seja encerrado.
StreamManagerClient
comporta as operações a seguir:
Criar stream de mensagens
Para criar um fluxo, um componente do Greengrass definido pelo usuário chama o método create e especifica um objeto MessageStreamDefinition
. Esse objeto especifica o nome exclusivo do fluxo e define como o gerenciador de fluxo deve lidar com novos dados quando o tamanho máximo do fluxo for atingido. Você pode usar MessageStreamDefinition
e os tipos de dados (como ExportDefinition
, StrategyOnFull
e Persistence
) para definir outras propriedades de fluxo. Isso inclui:
-
O destino AWS IoT Analytics, o Kinesis Data Streams, o AWS IoT SiteWise e os destinos do Amazon S3, para exportações automáticas. Para ter mais informações, consulte Configurações de exportação para destinos compatíveis do Nuvem AWS.
-
Prioridade da exportação. O gerenciador de fluxo exporta fluxos de prioridade mais alta antes de fluxos de prioridade mais baixa.
-
Tamanho máximo do lote e intervalo de lote para AWS IoT Analytics Kinesis Data Streams e destinos do AWS IoT SiteWise. O gerenciador de fluxo exporta mensagens quando qualquer condição é atendida.
-
Time-to-Live (TTL – Vida útil) O tempo necessário para garantir que os dados de fluxo estejam disponíveis para processamento. Você deve certificar-se de que os dados podem ser consumidos nesse período de tempo. Esta não é uma política de exclusão. Os dados podem não ser excluídos imediatamente após o período de TTL.
-
Persistência do fluxo. Selecione salvar fluxos no sistema de arquivos para persistir os dados nas reinicializações do núcleo ou salve os fluxos na memória.
-
Número de sequência inicial. Especifique o número de sequência da mensagem a ser usada como mensagem inicial na exportação.
Para obter mais informações sobre MessageStreamDefinition
, consulte a referência do SDK para a sua linguagem de destino:
O StreamManagerClient
também fornece um destino alvo que você pode usar para exportar fluxos para um servidor HTTP. Este destino deve ser usado apenas para fins de teste. Ele não é estável e nem compatível para uso em ambientes de produção.
Depois que um fluxo é criado, os componentes do Greengrass podem anexar mensagens ao fluxo para enviar dados para exportação e ler mensagens do fluxo para processamento local. O número de fluxos criados depende dos seus recursos de hardware e caso de negócios. Uma estratégia é criar um fluxo para cada canal de destino no AWS IoT Analytics ou no fluxo de dados do Kinesis, embora você possa definir vários destinos para um fluxo. Um fluxo tem longa duração.
Requisitos
Essa operação tem os seguintes requisitos:
Exemplos
O snippet a seguir cria um fluxo chamado StreamName
. Ele define as propriedades de fluxo em MessageStreamDefinition
e nos tipos de dados subordinados.
- Python
-
client = StreamManagerClient()
try:
client.create_message_stream(MessageStreamDefinition(
name="StreamName", # Required.
max_size=268435456, # Default is 256 MB.
stream_segment_size=16777216, # Default is 16 MB.
time_to_live_millis=None, # By default, no TTL is enabled.
strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required.
persistence=Persistence.File, # Default is File.
flush_on_write=False, # Default is false.
export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the Nuvem AWS.
kinesis=None,
iot_analytics=None,
iot_sitewise=None,
s3_task_executor=None
)
))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Referência do SDK em Python: create_message_stream | MessageStreamDefinition
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
client.createMessageStream(
new MessageStreamDefinition()
.withName("StreamName") // Required.
.withMaxSize(268435456L) // Default is 256 MB.
.withStreamSegmentSize(16777216L) // Default is 16 MB.
.withTimeToLiveMillis(null) // By default, no TTL is enabled.
.withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required.
.withPersistence(Persistence.File) // Default is File.
.withFlushOnWrite(false) // Default is false.
.withExportDefinition( // Optional. Choose where/how the stream is exported to the Nuvem AWS.
new ExportDefinition()
.withKinesis(null)
.withIotAnalytics(null)
.withIotSitewise(null)
.withS3(null)
)
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Referência do SDK em Java: createMessageStream | MessageStreamDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
await client.createMessageStream(
new MessageStreamDefinition()
.withName("StreamName") // Required.
.withMaxSize(268435456) // Default is 256 MB.
.withStreamSegmentSize(16777216) // Default is 16 MB.
.withTimeToLiveMillis(null) // By default, no TTL is enabled.
.withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required.
.withPersistence(Persistence.File) // Default is File.
.withFlushOnWrite(false) // Default is false.
.withExportDefinition( // Optional. Choose where/how the stream is exported to the Nuvem AWS.
new ExportDefinition()
.withKinesis(null)
.withIotAnalytics(null)
.withIotSiteWise(null)
.withS3(null)
)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Referência do SDK em Node.js: createMessageStream | MessageStreamDefinition
Para obter mais informações sobre como configurar destinos de exportação, consulte Configurações de exportação para destinos compatíveis do Nuvem AWS.
Anexar mensagem
Para enviar dados ao Gerenciador de fluxos para exportação, os componentes do Greengrass anexam os dados ao fluxo de destino. O destino da exportação determina o tipo de dados a ser passado para esse método.
Requisitos
Essa operação tem os seguintes requisitos:
Exemplos
Destinos de exportação do AWS IoT Analytics ou do Kinesis Data Streams
O snippet a seguir anexa uma mensagem ao fluxo chamado StreamName
. Para os destinos do AWS IoT Analytics ou do Kinesis Data Streams, os componentes do Greengrass anexam um blob de dados.
Esse snippet tem os seguintes requisitos:
- Python
-
client = StreamManagerClient()
try:
sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data')
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Referência do SDK em Python: append_message
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes());
} catch (StreamManagerException e) {
// Properly handle exception.
}
Referência do SDK em Java: appendMessage
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array"));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Referência do SDK em Node.js: appendMessage
Destinos de exportação do AWS IoT SiteWise
O snippet a seguir anexa uma mensagem ao fluxo chamado StreamName
. Para os destinos do AWS IoT SiteWise, os componentes do Greengrass anexam um objeto serializado PutAssetPropertyValueEntry
. Para ter mais informações, consulte Exportando para o AWS IoT SiteWise.
Ao enviar dados para o AWS IoT SiteWise, os dados devem atender aos requisitos da ação BatchPutAssetPropertyValue
. Para obter mais informações, consulte BatchPutAssetPropertyValue na AWS IoT SiteWise Referência de API.
Esse snippet tem os seguintes requisitos:
- Python
-
client = StreamManagerClient()
try:
# SiteWise requires unique timestamps in all messages and also needs timestamps not earlier
# than 10 minutes in the past. Add some randomness to time and offset.
# Note: To create a new asset property data, you should use the classes defined in the
# greengrasssdk.stream_manager module.
time_in_nanos = TimeInNanos(
time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
)
variant = Variant(double_value=random.random())
asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset)
sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Referência do SDK em Python: append_message | PutAssetPropertyValueEntry
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
Random rand = new Random();
// Note: To create a new asset property data, you should use the classes defined in the
// com.amazonaws.greengrass.streammanager.model.sitewise package.
List<AssetPropertyValue> entries = new ArrayList<>() ;
// IoTSiteWise requires unique timestamps in all messages and also needs timestamps not earlier
// than 10 minutes in the past. Add some randomness to time and offset.
final int maxTimeRandomness = 60;
final int maxOffsetRandomness = 10000;
double randomValue = rand.nextDouble();
TimeInNanos timestamp = new TimeInNanos()
.withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness))
.withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness)));
AssetPropertyValue entry = new AssetPropertyValue()
.withValue(new Variant().withDoubleValue(randomValue))
.withQuality(Quality.GOOD)
.withTimestamp(timestamp);
entries.add(entry);
PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
.withEntryId(UUID.randomUUID().toString())
.withPropertyAlias("PropertyAlias")
.withPropertyValues(entries);
long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (StreamManagerException e) {
// Properly handle exception.
}
Referência do SDK em Java: appendMessage | PutAssetPropertyValueEntry
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const maxTimeRandomness = 60;
const maxOffsetRandomness = 10000;
const randomValue = Math.random();
// Note: To create a new asset property data, you should use the classes defined in the
// aws-greengrass-core-sdk StreamManager module.
const timestamp = new TimeInNanos()
.withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness))
.withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness));
const entry = new AssetPropertyValue()
.withValue(new Variant().withDoubleValue(randomValue))
.withQuality(Quality.GOOD)
.withTimestamp(timestamp);
const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
.withEntryId(`${ENTRY_ID_PREFIX}${i}`)
.withPropertyAlias("PropertyAlias")
.withPropertyValues([entry]);
const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Referência do SDK em Node.js: appendMessage | PutAssetPropertyValueEntry
Destinos de exportação do Amazon S3
O snippet a seguir anexa uma tarefa de exportação ao fluxo chamada StreamName
. Para os destinos do Amazon S3, os componentes do Greengrass anexam um objeto serializado S3ExportTaskDefinition
que contém informações sobre o arquivo de entrada de origem e o objeto do Amazon S3 de destino. Se o objeto especificado não existir, o gerenciador de fluxo criará o objeto para você. Para ter mais informações, consulte Exportar para o Amazon S3.
Esse snippet tem os seguintes requisitos:
- Python
-
client = StreamManagerClient()
try:
# Append an Amazon S3 Task definition and print the sequence number.
s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName")
sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Referência do SDK em Python: append_message | S3ExportTaskDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
// Append an Amazon S3 export task definition and print the sequence number.
S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition()
.withBucket("BucketName")
.withKey("KeyName")
.withInputUrl("URLToFile");
long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition));
} catch (StreamManagerException e) {
// Properly handle exception.
}
Referência do SDK em Java: appendMessage | S3ExportTaskDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
// Append an Amazon S3 export task definition and print the sequence number.
const taskDefinition = new S3ExportTaskDefinition()
.withBucket("BucketName")
.withKey("KeyName")
.withInputUrl("URLToFile");
const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition)));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Referência do SDK em Node.js: appendMessage | S3ExportTaskDefinition
Ler Mensagens
Ler mensagens de um fluxo.
Requisitos
Essa operação tem os seguintes requisitos:
Exemplos
O snippet a seguir lê mensagens do fluxo chamado StreamName
. O método de leitura usa um objeto ReadMessagesOptions
opcional que especifica o número de sequência a partir do qual começar a ler, os números mínimo e máximo a ler e um tempo limite para ler mensagens.
- Python
-
client = StreamManagerClient()
try:
message_list = client.read_messages(
stream_name="StreamName",
# By default, if no options are specified, it tries to read one message from the beginning of the stream.
options=ReadMessagesOptions(
desired_start_sequence_number=100,
# Try to read from sequence number 100 or greater. By default, this is 0.
min_message_count=10,
# Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
max_message_count=100, # Accept up to 100 messages. By default this is 1.
read_timeout_millis=5000
# Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
)
)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Referência do SDK em Python: read_messages | ReadMessagesOptions
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
List<Message> messages = client.readMessages("StreamName",
// By default, if no options are specified, it tries to read one message from the beginning of the stream.
new ReadMessagesOptions()
// Try to read from sequence number 100 or greater. By default this is 0.
.withDesiredStartSequenceNumber(100L)
// Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
.withMinMessageCount(10L)
// Accept up to 100 messages. By default this is 1.
.withMaxMessageCount(100L)
// Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
.withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis())
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Referência do SDK em Java: readMessages | ReadMessagesOptions
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const messages = await client.readMessages("StreamName",
// By default, if no options are specified, it tries to read one message from the beginning of the stream.
new ReadMessagesOptions()
// Try to read from sequence number 100 or greater. By default this is 0.
.withDesiredStartSequenceNumber(100)
// Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1.
.withMinMessageCount(10)
// Accept up to 100 messages. By default this is 1.
.withMaxMessageCount(100)
// Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
.withReadTimeoutMillis(5 * 1000)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Referência do SDK em Node.js: readMessages | ReadMessagesOptions
Listar streams
Obtenha a lista de fluxos no gerenciador de fluxos.
Requisitos
Essa operação tem os seguintes requisitos:
Exemplos
O snippet a seguir obtém uma lista dos fluxos (por nome) no gerenciador de fluxo.
- Python
-
client = StreamManagerClient()
try:
stream_names = client.list_streams()
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Referência do SDK em Python: list_streams
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
List<String> streamNames = client.listStreams();
} catch (StreamManagerException e) {
// Properly handle exception.
}
Referência do SDK em Java: ListStreams
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const streams = await client.listStreams();
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Referência do SDK em Node.js: listStreams
Descrever stream de mensagens
Obtenha metadados sobre um fluxo, incluindo a definição, o tamanho e o status de exportação do fluxo.
Requisitos
Essa operação tem os seguintes requisitos:
Exemplos
O snippet a seguir obtém metadados sobre o fluxo chamado StreamName
, incluindo a definição, o tamanho e o status do exportador do fluxo.
- Python
-
client = StreamManagerClient()
try:
stream_description = client.describe_message_stream(stream_name="StreamName")
if stream_description.export_statuses[0].error_message:
# The last export of export destination 0 failed with some error
# Here is the last sequence number that was successfully exported
stream_description.export_statuses[0].last_exported_sequence_number
if (stream_description.storage_status.newest_sequence_number >
stream_description.export_statuses[0].last_exported_sequence_number):
pass
# The end of the stream is ahead of the last exported sequence number
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Referência do SDK em Python: describe_message_stream
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
MessageStreamInfo description = client.describeMessageStream("StreamName");
String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage();
if (lastErrorMessage != null && !lastErrorMessage.equals("")) {
// The last export of export destination 0 failed with some error.
// Here is the last sequence number that was successfully exported.
description.getExportStatuses().get(0).getLastExportedSequenceNumber();
}
if (description.getStorageStatus().getNewestSequenceNumber() >
description.getExportStatuses().get(0).getLastExportedSequenceNumber()) {
// The end of the stream is ahead of the last exported sequence number.
}
} catch (StreamManagerException e) {
// Properly handle exception.
}
Referência do SDK em Java: DescribeMessageStream
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const description = await client.describeMessageStream("StreamName");
const lastErrorMessage = description.exportStatuses[0].errorMessage;
if (lastErrorMessage) {
// The last export of export destination 0 failed with some error.
// Here is the last sequence number that was successfully exported.
description.exportStatuses[0].lastExportedSequenceNumber;
}
if (description.storageStatus.newestSequenceNumber >
description.exportStatuses[0].lastExportedSequenceNumber) {
// The end of the stream is ahead of the last exported sequence number.
}
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Referência do SDK em Node.js: uescribeMessageStream
Atualize o fluxo de mensagens
Atualize as propriedades de um fluxo existente. Talvez você queira atualizar um fluxo se seus requisitos mudarem após a criação do fluxo. Por exemplo:
-
Adicione uma nova configuração de exportação para um destino Nuvem AWS.
-
Aumente o tamanho máximo de um fluxo para alterar a forma como os dados são exportados ou retidos. Por exemplo, o tamanho do fluxo em combinação com sua estratégia em configurações completas pode resultar na exclusão ou rejeição dos dados antes que o gerenciador de fluxo possa processá-los.
-
Pause e retome as exportações; por exemplo, se as tarefas de exportação forem demoradas e você quiser racionar seus dados de upload.
Os componentes do Greengrass seguem este processo de alto nível para atualizar um fluxo:
-
Obter a descrição do fluxo.
-
Atualizar as propriedades de destino nos objetos correspondentes MessageStreamDefinition
e subordinados.
-
Passar o atualizado MessageStreamDefinition
. Certifique-se de incluir as definições completas do objeto para o fluxo atualizado. As propriedades indefinidas revertem para os valores padrão.
Você pode especificar o número de sequência da mensagem a ser usada como mensagem inicial na exportação.
Requisitos
Essa operação tem os seguintes requisitos:
Exemplos
O snippet a seguir atualiza o fluxo chamado StreamName
. Ele atualiza várias propriedades de um fluxo que é exportado para o Kinesis Data Streams.
- Python
-
client = StreamManagerClient()
try:
message_stream_info = client.describe_message_stream(STREAM_NAME)
message_stream_info.definition.max_size=536870912
message_stream_info.definition.stream_segment_size=33554432
message_stream_info.definition.time_to_live_millis=3600000
message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData
message_stream_info.definition.persistence=Persistence.Memory
message_stream_info.definition.flush_on_write=False
message_stream_info.definition.export_definition.kinesis=
[KinesisConfig(
# Updating Export definition to add a Kinesis Stream configuration.
identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))]
client.update_message_stream(message_stream_info.definition)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Referência do SDK em Python: updateMessageStream | MessageStreamDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME);
// Update the message stream with new values.
client.updateMessageStream(
messageStreamInfo.getDefinition()
.withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
// Max Size update should be greater than initial Max Size defined in Create Message Stream request
.withMaxSize(536870912L) // Update Max Size to 512 MB.
.withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB.
.withFlushOnWrite(true) // Update flush on write to true.
.withPersistence(Persistence.Memory) // Update the persistence to Memory.
.withTimeToLiveMillis(3600000L) // Update TTL to 1 hour.
.withExportDefinition(
// Optional. Choose where/how the stream is exported to the Nuvem AWS.
messageStreamInfo.getDefinition().getExportDefinition().
// Updating Export definition to add a Kinesis Stream configuration.
.withKinesis(new ArrayList<KinesisConfig>() {{
add(new KinesisConfig()
.withIdentifier(EXPORT_IDENTIFIER)
.withKinesisStreamName("test"));
}})
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Referência do SDK em Java: update_message_stream | MessageStreamDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const messageStreamInfo = await c.describeMessageStream(STREAM_NAME);
await client.updateMessageStream(
messageStreamInfo.definition
// Max Size update should be greater than initial Max Size defined in Create Message Stream request
.withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB.
.withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB.
.withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour.
.withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
.withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory
.withFlushOnWrite(true) // Default is false. Updating to true.
.withExportDefinition(
// Optional. Choose where/how the stream is exported to the Nuvem AWS.
messageStreamInfo.definition.exportDefinition
// Updating Export definition to add a Kinesis Stream configuration.
.withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())])
)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Referência do SDK em Node.js: updateMessageStream | MessageStreamDefinition
Restrições para a atualização de fluxos
As restrições a seguir se aplicam ao atualizar fluxos. A menos que indicado na lista a seguir, as atualizações entrarão em vigor imediatamente.
-
Não é possível atualizar a persistência de um fluxo. Para alterar esse comportamento, exclua o fluxo e crie um fluxo que defina a nova política de persistência.
-
Você só pode atualizar o tamanho máximo de um fluxo sob as seguintes condições:
-
O tamanho máximo deve ser maior que o tamanho atual do fluxo. Para encontrar essas informações, descreva o fluxo e, em seguida, verifique o status de armazenamento do objeto MessageStreamInfo
retornado.
-
O tamanho máximo deve ser maior ou igual ao tamanho do segmento do fluxo.
-
Você pode atualizar o tamanho do segmento do fluxo para um valor menor que o tamanho máximo do fluxo. A configuração atualizada se aplica aos novos segmentos.
-
As atualizações da propriedade tempo de vida (TTL) se aplicam às novas operações de anexação. Se você diminuir esse valor, o gerenciador de fluxo também poderá excluir segmentos existentes que excedam o TTL.
-
As atualizações da estratégia em toda a propriedade se aplicam às novas operações de anexação. Se você definir a estratégia para substituir os dados mais antigos, o gerenciador de fluxo também poderá substituir os segmentos existentes com base na nova configuração.
-
As atualizações na propriedade “descartar após gravação” se aplicam às novas mensagens.
-
As atualizações nas configurações de exportação se aplicam às novas exportações. A solicitação de atualização deve incluir todas as configurações de exportação às quais você deseja oferecer suporte. Caso contrário, o gerenciador de fluxo as excluirá.
-
Ao atualizar uma configuração de exportação, especifique o identificador da configuração de exportação de destino.
-
Para adicionar uma configuração de exportação, especifique um identificador exclusivo para a nova configuração de exportação.
-
Para excluir uma configuração de exportação, omita a configuração de exportação.
-
Para atualizar o número da sequência inicial de uma configuração de exportação em um fluxo, você deve especificar um valor menor que o número de sequência mais recente. Para encontrar essas informações, descreva o fluxo e, em seguida, verifique o status de armazenamento do objeto MessageStreamInfo
retornado.
Excluir stream de mensagens
Exclui um fluxo. Quando você exclui um fluxo, todos os dados armazenados para o fluxo são excluídos do disco.
Requisitos
Essa operação tem os seguintes requisitos:
Exemplos
O snippet a seguir exclui o fluxo chamado StreamName
.
- Python
-
client = StreamManagerClient()
try:
client.delete_message_stream(stream_name="StreamName")
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Referência do SDK em Python: deleteMessageStream
- Java
-
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
client.deleteMessageStream("StreamName");
} catch (StreamManagerException e) {
// Properly handle exception.
}
Referência do SDK em Java: delete_message_stream
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
await client.deleteMessageStream("StreamName");
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Referência do SDK em Node.js: deleteMessageStream
Consulte também