AWS IoT Greengrass Version 1 è entrato nella fase di estensione della vita utile il 30 giugno 2023. Per ulteriori informazioni, consulta la politica AWS IoT Greengrass V1 di manutenzione. Dopo questa data, AWS IoT Greengrass V1 non rilascerà aggiornamenti che forniscano funzionalità, miglioramenti, correzioni di bug o patch di sicurezza. I dispositivi che funzionano AWS IoT Greengrass V1 non subiranno interruzioni e continueranno a funzionare e a connettersi al cloud. Ti consigliamo vivamente di eseguire la migrazione a AWS IoT Greengrass Version 2, che aggiunge nuove importanti funzionalità e supporto per piattaforme aggiuntive.
Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Utilizza StreamManagerClient per lavorare con i flussi
Funzioni Lambda definite dall'utente in esecuzione sulAWS IoT Greengrasscore può usare ilStreamManagerClient
l'oggetto nelAWS IoT GreengrassCore SDKper creare flussi inGestore di flussie quindi interagisci con i flussi. Quando una funzione Lambda crea un flusso, definisce ilCloud AWSdestinazioni, assegnazione delle priorità e altre policy di esportazione e conservazione dei dati per il flusso. Per inviare dati a stream manager, le funzioni Lambda aggiungono i dati allo stream. Se viene definita una destinazione di esportazione per il flusso, stream manager esporta automaticamente il flusso.
In genere, i client del gestore flussi sono funzioni Lambda definite dall'utente. Se richiesto dal business case, puoi anche consentire a processi non Lambda in esecuzione sul Greengrass core (ad esempio, un container Docker) di interagire con stream manager. Per ulteriori informazioni, consulta la pagina Autenticazione client .
I frammenti di codice in questo argomento mostrano come chiamano i clientStreamManagerClient
per lavorare con i flussi. Per i dettagli di implementazione relativi ai metodi e ai loro argomenti, utilizza i collegamenti al riferimento SDK elencati dopo ogni frammento di codice. Per i tutorial che includono una funzione Python Lambda completa, vedereEsportazione di flussi di dati inCloud AWS(console)oEsportazione di flussi di dati inCloud AWS(CLI).
La funzione Lambda dovrebbe creare un'istanzaStreamManagerClient
al di fuori del gestore di funzioni. Se viene creata un'istanza nel gestore, la funzione crea un client
e una connessione al gestore flussi ogni volta che viene richiamata.
Se si esegue un'istanza StreamManagerClient
nel gestore, è necessario chiamare esplicitamente il metodo close()
quando client
completa il suo lavoro. In caso contrario, client
mantiene la connessione aperta e un altro thread in esecuzione fino alla chiusura dello script.
StreamManagerClient
supporta le seguenti operazioni:
Creazione del flusso di messaggi
Per creare un flusso, una funzione Lambda definita dall'utente chiama il metodo create e passa in unMessageStreamDefinition
oggetto. Questo oggetto specifica il nome univoco per il flusso e definisce il modo in cui stream manager deve gestire i nuovi dati quando viene raggiunta la dimensione massima del flusso. È possibile utilizzare MessageStreamDefinition
e relativi tipi di dati (ad esempio ExportDefinition
, StrategyOnFull
e Persistence
) per definire altre proprietà del flusso. Eccone alcuni:
-
L'obiettivoAWS IoT Analytics, Kinesis Data Streams,AWS IoT SiteWisee destinazioni Amazon S3 per le esportazioni automatiche. Per ulteriori informazioni, consulta la pagina Configurazioni di esportazione per supportateCloud AWSdestinazioni .
-
Priorità di esportazione. Stream manager esporta i flussi con priorità più alta prima dei flussi con priorità più bassa.
-
Dimensione del batch massima e intervallo batch perAWS IoT AnalyticsKinesis Data Streams eAWS IoT SiteWisedestinazioni. Stream manager esporta i messaggi quando viene soddisfatta una delle due condizioni.
-
Time-to-live (TTL). Il tempo necessario per garantire che i dati del flusso siano disponibili per l'elaborazione. È necessario assicurarsi che i dati possano essere utilizzati entro questo periodo di tempo. Questa non è una policy di eliminazione. È possibile che i dati non vengano eliminati immediatamente dopo il periodo TTL.
-
Persistenza del flusso. Scegliere di salvare i flussi nel file system per mantenere i dati tra riavvii core o salvare i flussi in memoria.
-
Numero di sequenza di partenza. Specificare il numero di sequenza del messaggio da utilizzare come messaggio iniziale nell'esportazione.
Per ulteriori informazioni suMessageStreamDefinition
, vedere il riferimento SDK per la lingua di destinazione:
StreamManagerClient
fornisce anche una destinazione di destinazione che è possibile utilizzare per esportare i flussi in un server HTTP. Questo target è destinato esclusivamente a scopi di test. Non è stabile o supportato per l'uso in ambienti di produzione.
Dopo aver creato un flusso, le funzioni Lambda possonoAggiunta di messaggiallo stream per inviare dati per l'esportazione elettura di messaggidal flusso per l'elaborazione locale. Il numero di flussi creati dipende dalle funzionalità hardware e dal business case. Una strategia consiste nel creare un flusso per ogni canale di destinazione inAWS IoT Analyticso flusso di dati Kinesis, anche se è possibile definire più target per un flusso. Un flusso ha una lunga durata.
Requisiti
Questa operazione ha i requisiti seguenti:
Creazione di flussi con unAWS IoT SiteWiseo la destinazione di esportazione Amazon S3 ha i requisiti seguenti:
Esempi
Il frammento di codice seguente crea un flusso denominato StreamName
. Definisce le proprietà del flusso nelMessageStreamDefinition
e tipi di dati subordinati.
- Python
-
client = StreamManagerClient()
try:
client.create_message_stream(MessageStreamDefinition(
name="StreamName", # Required.
max_size=268435456, # Default is 256 MB.
stream_segment_size=16777216, # Default is 16 MB.
time_to_live_millis=None, # By default, no TTL is enabled.
strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required.
persistence=Persistence.File, # Default is File.
flush_on_write=False, # Default is false.
export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the Cloud AWS.
kinesis=None,
iot_analytics=None,
iot_sitewise=None,
s3_task_executor=None
)
))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Riferimento SDK Python:create_message_stream|MessageStreamDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
client.createMessageStream(
new MessageStreamDefinition()
.withName("StreamName") // Required.
.withMaxSize(268435456L) // Default is 256 MB.
.withStreamSegmentSize(16777216L) // Default is 16 MB.
.withTimeToLiveMillis(null) // By default, no TTL is enabled.
.withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required.
.withPersistence(Persistence.File) // Default is File.
.withFlushOnWrite(false) // Default is false.
.withExportDefinition( // Optional. Choose where/how the stream is exported to the Cloud AWS.
new ExportDefinition()
.withKinesis(null)
.withIotAnalytics(null)
.withIotSitewise(null)
.withS3TaskExecutor(null)
)
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Riferimento SDK Java:createMessageStream|MessageStreamDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
await client.createMessageStream(
new MessageStreamDefinition()
.withName("StreamName") // Required.
.withMaxSize(268435456) // Default is 256 MB.
.withStreamSegmentSize(16777216) // Default is 16 MB.
.withTimeToLiveMillis(null) // By default, no TTL is enabled.
.withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required.
.withPersistence(Persistence.File) // Default is File.
.withFlushOnWrite(false) // Default is false.
.withExportDefinition( // Optional. Choose where/how the stream is exported to the Cloud AWS.
new ExportDefinition()
.withKinesis(null)
.withIotAnalytics(null)
.withIotSitewise(null)
.withS3TaskExecutor(null)
)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Riferimento SDK Node.js:createMessageStream|MessageStreamDefinition
Per ulteriori informazioni sulla configurazione delle destinazioni di esportazione, consultaConfigurazioni di esportazione per supportateCloud AWSdestinazioni.
Aggiunta di un messaggio
Per inviare i dati al gestore flussi per l'esportazione, le funzioni Lambda aggiungono i dati al flusso di destinazione. La destinazione di esportazione determina il tipo di dati da passare a questo metodo.
Requisiti
Questa operazione ha i requisiti seguenti:
Aggiunta di messaggi con unAWS IoT SiteWiseo la destinazione di esportazione Amazon S3 ha i requisiti seguenti:
Esempi
AWS IoT Analyticso le destinazioni di esportazione di Kinesis Data Streams
Il frammento di codice seguente aggiunge un messaggio al flusso denominato StreamName
. PerAWS IoT Analyticso destinazioni Kinesis Data Streams, le tue funzioni Lambda aggiungono un blob di dati.
Questo frammento ha i requisiti seguenti:
- Python
-
client = StreamManagerClient()
try:
sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data')
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Riferimento SDK Python:append_message
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes());
} catch (StreamManagerException e) {
// Properly handle exception.
}
Riferimento SDK Java:appendMessage
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array"));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Riferimento SDK Node.js:appendMessage
AWS IoT SiteWisedestinazioni di esportazione
Il frammento di codice seguente aggiunge un messaggio al flusso denominato StreamName
. PerAWS IoT SiteWisedestinazioni, le tue funzioni Lambda aggiungono un serializzatoPutAssetPropertyValueEntry
oggetto. Per ulteriori informazioni, consulta la pagina Esportazione di inAWS IoT SiteWise .
Quando si inviano dati aAWS IoT SiteWisei dati devono soddisfare i requisiti delBatchPutAssetPropertyValue
Operazione . Per ulteriori informazioni, consulta BatchPutAssetPropertyValue nella Documentazione di riferimento API AWS IoT SiteWise.
Questo frammento ha i requisiti seguenti:
- Python
-
client = StreamManagerClient()
try:
# SiteWise requires unique timestamps in all messages. Add some randomness to time and offset.
# Note: To create a new asset property data, you should use the classes defined in the
# greengrasssdk.stream_manager module.
time_in_nanos = TimeInNanos(
time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
)
variant = Variant(double_value=random.random())
asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset)
sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Riferimento SDK Python:append_message|PutAssetPropertyValueEntry
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
Random rand = new Random();
// Note: To create a new asset property data, you should use the classes defined in the
// com.amazonaws.greengrass.streammanager.model.sitewise package.
List<AssetPropertyValue> entries = new ArrayList<>() ;
// IoTSiteWise requires unique timestamps in all messages. Add some randomness to time and offset.
final int maxTimeRandomness = 60;
final int maxOffsetRandomness = 10000;
double randomValue = rand.nextDouble();
TimeInNanos timestamp = new TimeInNanos()
.withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness))
.withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness)));
AssetPropertyValue entry = new AssetPropertyValue()
.withValue(new Variant().withDoubleValue(randomValue))
.withQuality(Quality.GOOD)
.withTimestamp(timestamp);
entries.add(entry);
PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
.withEntryId(UUID.randomUUID().toString())
.withPropertyAlias("PropertyAlias")
.withPropertyValues(entries);
long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (StreamManagerException e) {
// Properly handle exception.
}
Riferimento SDK Java:appendMessage|PutAssetPropertyValueEntry
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const maxTimeRandomness = 60;
const maxOffsetRandomness = 10000;
const randomValue = Math.random();
// Note: To create a new asset property data, you should use the classes defined in the
// aws-greengrass-core-sdk StreamManager module.
const timestamp = new TimeInNanos()
.withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness))
.withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness));
const entry = new AssetPropertyValue()
.withValue(new Variant().withDoubleValue(randomValue))
.withQuality(Quality.GOOD)
.withTimestamp(timestamp);
const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
.withEntryId(`${ENTRY_ID_PREFIX}${i}`)
.withPropertyAlias("PropertyAlias")
.withPropertyValues([entry]);
const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Riferimento SDK Node.js:appendMessage|PutAssetPropertyValueEntry
Destinazioni di esportazione di Amazon S3
Il frammento di codice seguente aggiunge un'attività di esportazione al flusso denominatoStreamName
. Per le destinazioni Amazon S3, le tue funzioni Lambda aggiungono una serie serializzataS3ExportTaskDefinition
oggetto che contiene informazioni sul file di input di origine e sull'oggetto Amazon S3 di destinazione. Se l'oggetto specificato non esiste, Stream Manager lo crea per te. Per ulteriori informazioni, consulta la pagina Esportazione di in Amazon S3 .
Questo frammento ha i requisiti seguenti:
- Python
-
client = StreamManagerClient()
try:
# Append an Amazon S3 Task definition and print the sequence number.
s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName")
sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Riferimento SDK Python:append_message|Definizione attività di esportazione S3
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
// Append an Amazon S3 export task definition and print the sequence number.
S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition()
.withBucket("BucketName")
.withKey("KeyName")
.withInputUrl("URLToFile");
long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition));
} catch (StreamManagerException e) {
// Properly handle exception.
}
Riferimento SDK Java:appendMessage|Definizione attività di esportazione S3
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
// Append an Amazon S3 export task definition and print the sequence number.
const taskDefinition = new S3ExportTaskDefinition()
.withBucket("BucketName")
.withKey("KeyName")
.withInputUrl("URLToFile");
const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition)));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Riferimento SDK Node.js:appendMessage|Definizione attività di esportazione S3
Lettura di messaggi
Leggere i messaggi da un flusso.
Requisiti
Questa operazione ha i requisiti seguenti:
Esempi
Il frammento di codice seguente legge i messaggi dal flusso denominato StreamName
. Il metodo di lettura accetta un oggetto ReadMessagesOptions
facoltativo che specifica il numero di sequenza da cui iniziare la lettura, i numeri minimo e massimo da leggere e un timeout per la lettura dei messaggi.
- Python
-
client = StreamManagerClient()
try:
message_list = client.read_messages(
stream_name="StreamName",
# By default, if no options are specified, it tries to read one message from the beginning of the stream.
options=ReadMessagesOptions(
desired_start_sequence_number=100,
# Try to read from sequence number 100 or greater. By default, this is 0.
min_message_count=10,
# Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
max_message_count=100, # Accept up to 100 messages. By default this is 1.
read_timeout_millis=5000
# Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
)
)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Riferimento SDK Python:read_messages|ReadMessagesOptions
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
List<Message> messages = client.readMessages("StreamName",
// By default, if no options are specified, it tries to read one message from the beginning of the stream.
new ReadMessagesOptions()
// Try to read from sequence number 100 or greater. By default this is 0.
.withDesiredStartSequenceNumber(100L)
// Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
.withMinMessageCount(10L)
// Accept up to 100 messages. By default this is 1.
.withMaxMessageCount(100L)
// Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
.withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis())
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Riferimento SDK Java:readMessages|ReadMessagesOptions
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const messages = await client.readMessages("StreamName",
// By default, if no options are specified, it tries to read one message from the beginning of the stream.
new ReadMessagesOptions()
// Try to read from sequence number 100 or greater. By default this is 0.
.withDesiredStartSequenceNumber(100)
// Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1.
.withMinMessageCount(10)
// Accept up to 100 messages. By default this is 1.
.withMaxMessageCount(100)
// Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
.withReadTimeoutMillis(5 * 1000)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Riferimento SDK Node.js:readMessages|ReadMessagesOptions
Visualizzazione dell'elenco di flussi
Ottieni l'elenco dei flussi in stream manager.
Requisiti
Questa operazione ha i requisiti seguenti:
Esempi
Il frammento di codice seguente ottiene un elenco dei flussi (per nome) in stream manager.
- Python
-
client = StreamManagerClient()
try:
stream_names = client.list_streams()
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Riferimento SDK Python:list_streams
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
List<String> streamNames = client.listStreams();
} catch (StreamManagerException e) {
// Properly handle exception.
}
Riferimento SDK Java:listStreams
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const streams = await client.listStreams();
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Riferimento SDK Node.js:listStreams
Descrizione del flusso di messaggi
Ottieni i metadati relativi a un flusso, inclusi la definizione, le dimensioni e lo stato di esportazione del flusso.
Requisiti
Questa operazione ha i requisiti seguenti:
Esempi
Il frammento di codice seguente ottiene i metadati relativi al flusso denominato StreamName
, inclusi la definizione, le dimensioni e gli stati di esportatore del flusso.
- Python
-
client = StreamManagerClient()
try:
stream_description = client.describe_message_stream(stream_name="StreamName")
if stream_description.export_statuses[0].error_message:
# The last export of export destination 0 failed with some error
# Here is the last sequence number that was successfully exported
stream_description.export_statuses[0].last_exported_sequence_number
if (stream_description.storage_status.newest_sequence_number >
stream_description.export_statuses[0].last_exported_sequence_number):
pass
# The end of the stream is ahead of the last exported sequence number
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Riferimento SDK Python:describe_message_stream
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
MessageStreamInfo description = client.describeMessageStream("StreamName");
String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage();
if (lastErrorMessage != null && !lastErrorMessage.equals("")) {
// The last export of export destination 0 failed with some error.
// Here is the last sequence number that was successfully exported.
description.getExportStatuses().get(0).getLastExportedSequenceNumber();
}
if (description.getStorageStatus().getNewestSequenceNumber() >
description.getExportStatuses().get(0).getLastExportedSequenceNumber()) {
// The end of the stream is ahead of the last exported sequence number.
}
} catch (StreamManagerException e) {
// Properly handle exception.
}
Riferimento SDK Java:describeMessageStream
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const description = await client.describeMessageStream("StreamName");
const lastErrorMessage = description.exportStatuses[0].errorMessage;
if (lastErrorMessage) {
// The last export of export destination 0 failed with some error.
// Here is the last sequence number that was successfully exported.
description.exportStatuses[0].lastExportedSequenceNumber;
}
if (description.storageStatus.newestSequenceNumber >
description.exportStatuses[0].lastExportedSequenceNumber) {
// The end of the stream is ahead of the last exported sequence number.
}
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Riferimento SDK Node.js:describeMessageStream
Aggiornamento del flusso di messaggi
Aggiorna le proprietà di un flusso esistente. È possibile aggiornare un flusso se i requisiti cambiano dopo la creazione dello stream. Ad esempio:
-
Aggiungi un nuovoconfigurazione di esportazioneper unCloud AWSdestinazione.
-
Aumenta la dimensione massima di un flusso per modificare il modo in cui i dati vengono esportati o conservati. Ad esempio, la dimensione del flusso in combinazione con la strategia sulle impostazioni complete potrebbe comportare l'eliminazione o il rifiuto dei dati prima che lo stream manager possa elaborarli.
-
Metti in pausa e riprendi le esportazioni; ad esempio, se le attività di esportazione sono di lunga durata e desideri creare una razionazione dei dati di caricamento.
Le tue funzioni Lambda seguono questo processo di alto livello per aggiornare uno stream:
-
Descrizione del flusso.
-
Aggiorna le proprietà di destinazione sul corrispondenteMessageStreamDefinition
e oggetti subordinati.
-
Passa nella versione aggiornataMessageStreamDefinition
. Assicurati di includere le definizioni complete degli oggetti per il flusso aggiornato. Le proprietà non definite ripristinano i valori predefiniti.
È possibile specificare il numero di sequenza del messaggio da utilizzare come messaggio iniziale nell'esportazione.
Requisiti
Questa operazione ha i requisiti seguenti:
Esempi
Il frammento di codice seguente aggiorna il flusso denominatoStreamName
. Aggiorna più proprietà di un flusso che esporta in Kinesis Data Streams.
- Python
-
client = StreamManagerClient()
try:
message_stream_info = client.describe_message_stream(STREAM_NAME)
message_stream_info.definition.max_size=536870912
message_stream_info.definition.stream_segment_size=33554432
message_stream_info.definition.time_to_live_millis=3600000
message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData
message_stream_info.definition.persistence=Persistence.Memory
message_stream_info.definition.flush_on_write=False
message_stream_info.definition.export_definition.kinesis=
[KinesisConfig(
# Updating Export definition to add a Kinesis Stream configuration.
identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))]
client.update_message_stream(message_stream_info.definition)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Riferimento SDK Python:Aggiorna Message Stream|MessageStreamDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME);
// Update the message stream with new values.
client.updateMessageStream(
messageStreamInfo.getDefinition()
.withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
// Max Size update should be greater than initial Max Size defined in Create Message Stream request
.withMaxSize(536870912L) // Update Max Size to 512 MB.
.withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB.
.withFlushOnWrite(true) // Update flush on write to true.
.withPersistence(Persistence.Memory) // Update the persistence to Memory.
.withTimeToLiveMillis(3600000L) // Update TTL to 1 hour.
.withExportDefinition(
// Optional. Choose where/how the stream is exported to the Cloud AWS.
messageStreamInfo.getDefinition().getExportDefinition().
// Updating Export definition to add a Kinesis Stream configuration.
.withKinesis(new ArrayList<KinesisConfig>() {{
add(new KinesisConfig()
.withIdentifier(EXPORT_IDENTIFIER)
.withKinesisStreamName("test"));
}})
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Riferimento SDK Java:update_message_stream|MessageStreamDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const messageStreamInfo = await c.describeMessageStream(STREAM_NAME);
await client.updateMessageStream(
messageStreamInfo.definition
// Max Size update should be greater than initial Max Size defined in Create Message Stream request
.withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB.
.withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB.
.withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour.
.withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
.withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory
.withFlushOnWrite(true) // Default is false. Updating to true.
.withExportDefinition(
// Optional. Choose where/how the stream is exported to the Cloud AWS.
messageStreamInfo.definition.exportDefinition
// Updating Export definition to add a Kinesis Stream configuration.
.withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())])
)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Riferimento SDK Node.js:Aggiorna Message Stream|MessageStreamDefinition
Vincoli per l'aggiornamento dei flussi
I seguenti vincoli si applicano durante l'aggiornamento dei flussi. Se non indicato nel seguente elenco, gli aggiornamenti hanno effetto immediato.
-
Non è possibile aggiornare la persistenza di un flusso. Per modificare questo comportamento,Eliminazione del flussoecreare un flussoche definisce la nuova politica di persistenza.
-
È possibile aggiornare le dimensioni massime di un flusso solo alle seguenti condizioni:
-
La dimensione massima deve essere maggiore o uguale a quella corrente del flusso. Per trovare queste informazioni,describeil flussoe quindi controllare lo stato di archiviazione del prodotto restituitoMessageStreamInfo
oggetto.
-
La dimensione massima deve essere maggiore o uguale a quella del segmento del flusso.
-
È possibile aggiornare la dimensione del segmento di flusso con un valore inferiore alla dimensione massima del flusso. L'impostazione aggiornata si applica ai nuovi segmenti.
-
Gli aggiornamenti della proprietà time to live (TTL) si applicano alle nuove operazioni di accodamento. Se si riduce questo valore, stream manager potrebbe anche eliminare segmenti esistenti che superano il TTL.
-
Gli aggiornamenti della strategia sull'intera proprietà si applicano alle nuove operazioni di accodamento. Se si imposta la strategia per sovrascrivere i dati meno recenti, stream manager potrebbe anche sovrascrivere i segmenti esistenti in base alla nuova impostazione.
-
Gli aggiornamenti della proprietà flush on write si applicano ai nuovi messaggi.
-
Gli aggiornamenti delle configurazioni di esportazione si applicano alle nuove esportazioni. La richiesta di aggiornamento deve includere tutte le configurazioni di esportazione che si desidera supportare. In caso contrario, stream manager li elimina.
-
Quando si aggiorna una configurazione di esportazione, specificare l'identificatore della configurazione di esportazione di destinazione.
-
Per aggiungere una configurazione di esportazione, specificare un identificatore univoco per la nuova configurazione di esportazione.
-
Per eliminare una configurazione di esportazione, omettere la configurazione di esportazione.
-
Peraggiornareil numero di sequenza iniziale di una configurazione di esportazione in un flusso, è necessario specificare un valore inferiore all'ultimo numero di sequenza. Per trovare queste informazioni,describeil flussoe quindi controllare lo stato di archiviazione del prodotto restituitoMessageStreamInfo
oggetto.
Eliminazione del flusso di messaggi
Elimina un flusso. Quando si elimina un flusso, tutti i dati memorizzati per il flusso vengono eliminati dal disco.
Requisiti
Questa operazione ha i requisiti seguenti:
Esempi
Il frammento di codice seguente elimina il flusso denominato StreamName
.
- Python
-
client = StreamManagerClient()
try:
client.delete_message_stream(stream_name="StreamName")
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Riferimento SDK Python:deleteMessageStream
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
client.deleteMessageStream("StreamName");
} catch (StreamManagerException e) {
// Properly handle exception.
}
Riferimento SDK Java:delete_message_stream
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
await client.deleteMessageStream("StreamName");
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Riferimento SDK Node.js:deleteMessageStream
Consultare anche