使用 StreamManagerClient 使用串流 - AWS IoT Greengrass

AWS IoT Greengrass Version 1 於 2023 年 6 月 30 日進入延長生命週期階段。如需詳細資訊,請參閱 AWS IoT Greengrass V1 維護政策。在此日期之後, AWS IoT Greengrass V1 不會發佈提供功能、增強功能、錯誤修正或安全修補程式的更新。在 上執行的裝置 AWS IoT Greengrass V1 不會中斷,且會繼續運作並連線至雲端。我們強烈建議您遷移至 AWS IoT Greengrass Version 2 ,這會新增重要的新功能,並支援其他平台

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 StreamManagerClient 使用串流

用户定義的 Lambda 函數運行在AWS IoT Greengrass核心可以使用StreamManagerClientAWS IoT Greengrass核心開發套件創建流串流管理員,然後與流進行交互。當 Lambda 函數創建一個流時,它會定義AWS 雲端串流的目的地、優先順序以及其他匯出和資料保留政策。要將數據發送到流管理器,Lambda 函數會將數據追加到流中。如果已為串流定義匯出目的地,串流管理員會自動匯出串流。

注意

串流管理員的客户端通常是用户定義的 Lambda 函數。如果您的商業案例需要,則還可以允許在 Greengrass 核心上執行的非 LambDA 程序 (例如 Docker 容器) 與串流管理員互動。如需詳細資訊,請參閱 用戶端身分驗證

本主題中的程式碼片段顯示客户端如何在StreamManagerClient方法來處理串流。如需有關方法及其參數的實現詳細資訊,請使用每個程式碼片段後列出的 SDK 引用的鏈接。有關包含完整 Python Lambda 函數的教程,請參閲將資料串流匯出至AWS 雲端(console)或者匯出資料串流AWS 雲端(CLI)

你的 Lambda 函數應該實例化StreamManagerClient在函數處理程序之外。如果在處理常式內執行個體化,則該函數在每次叫用時都會建立 client 以及與串流管理員的連線。

注意

如果您在處理常式內將 StreamManagerClient 執行個體化,則必須在 client 完成其工作時明確呼叫 close() 方法。否則,client 會將連線保持在開啟狀態,並將另一個執行緒保持在執行狀態,直到指令碼結束為止。

StreamManagerClient 支援下列操作:

建立訊息串流

要創建一個流,用户定義的 Lambda 函數調用 create 方法並傳遞MessageStreamDefinition物件。此物件指定串流的唯一名稱,並定義串流管理員在串流達上限時應如何處理新資料。您可以使用 MessageStreamDefinition 及其資料類型 (例如 ExportDefinitionStrategyOnFullPersistence) 來定義其他串流屬性。其中包含:

  • 目標AWS IoT Analytics、Kinesis Data Streams、AWS IoT SiteWise以及用於自動導出的 Amazon S3 目標。如需詳細資訊,請參閱 導出支持的配置AWS 雲端目的地

  • 匯出優先順序。串流管理員會先匯出優先順序較高的串流,然後再匯出較低的串流。

  • 最大批次大小和批次間隔AWS IoT Analytics、Kinesis Data StreamsAWS IoT SiteWise目的地。符合任一條件時,串流管理員會匯出訊息。

  • 存留時間 (TTL)。保證串流資料可供處理的時間量。您應該確定資料可以在這段期間內使用。這不是刪除政策。TTL 期間後,資料可能不會立即刪除。

  • 串流持久性。選擇此選項可將串流儲存至檔案系統,以便在核心重新啟動期間保留資料,或將串流儲存在記憶體中。

  • 起始序號。指定要在導出過程中用作起始消息的消息序列號。

如需有關 的詳細資訊MessageStreamDefinition,請參您目標語言的軟體開發套件參考:

注意

StreamManagerClient還提供了一個目標,您可以用來將串流匯出至 HTTP 伺服器。此目標僅供測試之用。此不穩定,也不支援用於生產環境中。

建立串流後,您的 Lambda 函數可以附加訊息到流以發送數據以進行導出,然後讀取訊息從流進行本地處理。您建立的串流數量取決於您的硬體功能和商業案例。其中一項策略是在AWS IoT Analytics或 Kinesis 資料串流,儘管您可以為串流定義多個目標。串流的生命週期相當耐久。

要求

此操作有下列需求:

  • 下限AWS IoT Greengrass核心版本:1.10.0

  • 下限AWS IoT Greengrass核心開發套件版本:Python:Java 版本:1.4.0 | Node.js:1.6.0

注意

使用AWS IoT SiteWise或 Amazon S3 匯出目的地有下列要求:

  • 下限AWS IoT Greengrass核心版本:1.11.0

  • 下限AWS IoT Greengrass核心開發套件版本:Python:Java 1.6.0 | Java:1.5.0 | Node.js:1.7.0

範例

以下程式碼片段會建立名為 StreamName 的串流。它定義了MessageStreamDefinition和從屬數據類型。

Python
client = StreamManagerClient() try: client.create_message_stream(MessageStreamDefinition( name="StreamName", # Required. max_size=268435456, # Default is 256 MB. stream_segment_size=16777216, # Default is 16 MB. time_to_live_millis=None, # By default, no TTL is enabled. strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required. persistence=Persistence.File, # Default is File. flush_on_write=False, # Default is false. export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the AWS 雲端. kinesis=None, iot_analytics=None, iot_sitewise=None, s3_task_executor=None ) )) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 軟體開發套件參考:創建訊息串流|MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456L) // Default is 256 MB. .withStreamSegmentSize(16777216L) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS 雲端. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSitewise(null) .withS3TaskExecutor(null) ) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java 軟體開發套件參考:createMessageStream|MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456) // Default is 256 MB. .withStreamSegmentSize(16777216) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS 雲端. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSitewise(null) .withS3TaskExecutor(null) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 軟體開發套件參考:createMessageStream|MessageStreamDefinition

如需設定匯出目的地的詳細資訊,請參導出支持的配置AWS 雲端目的地

 

附加訊息

若要將資料發送到串流管理員以便匯出,您的 Lambda 函數會將資料附加到目標串流。導出目標決定傳遞給此方法的數據類型。

要求

此操作有下列需求:

  • 下限AWS IoT Greengrass核心版本:1.10.0

  • 下限AWS IoT Greengrass核心開發套件版本:Python:Java:1.4.0 | Node.js:1.6.0

注意

將消息附加到AWS IoT SiteWise或 Amazon S3 匯出目的地有下列要求:

  • 下限AWS IoT Greengrass核心版本:1.11.0

  • 下限AWS IoT Greengrass核心開發套件版本:Python:Java 1.6.0 | Java:1.5.0 | Node.js:1.7.0

範例

AWS IoT Analytics或 Kinesis Data Streams 導出目的地

下列程式碼片段附加一個訊息到名為 StreamName 的串流。適用於AWS IoT Analytics或 Kinesis Data Streams 目標時,您的 Lambda 函數會附加一個數據 Blob。

此代碼段有以下要求:

  • 下限AWS IoT Greengrass核心版本:1.10.0

  • 下限AWS IoT Greengrass核心開發套件版本:Python:Java 1.5.0 版:1.4.0 | Node.js:1.6.0

Python
client = StreamManagerClient() try: sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data') except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 軟體開發套件參考:附加訊息

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes()); } catch (StreamManagerException e) { // Properly handle exception. }

Java 軟體開發套件參考:appendMessage

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array")); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 軟體開發套件參考:appendMessage

AWS IoT SiteWise匯出目的地

下列程式碼片段附加一個訊息到名為 StreamName 的串流。適用於AWS IoT SiteWise目標時,您的 Lambda 函數會附加一個序列化的PutAssetPropertyValueEntry物件。如需詳細資訊,請參閱 匯出至AWS IoT SiteWise

注意

當您將資料傳送到AWS IoT SiteWise,資料必須符合BatchPutAssetPropertyValue動作。如需詳細資訊,請參閱《AWS IoT SiteWise API 參考》中的 BatchPutAssetPropertyValue

此代碼段有以下要求:

  • 下限AWS IoT Greengrass核心版本:1.11.0

  • 下限AWS IoT Greengrass核心開發套件版本:Python:Java 1.6.0 | Java:1.5.0 | Node.js:1.7.0

Python
client = StreamManagerClient() try: # SiteWise requires unique timestamps in all messages. Add some randomness to time and offset. # Note: To create a new asset property data, you should use the classes defined in the # greengrasssdk.stream_manager module. time_in_nanos = TimeInNanos( time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) ) variant = Variant(double_value=random.random()) asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset) sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 軟體開發套件參考:附加訊息|PutAssetPropertyValueEntry

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { Random rand = new Random(); // Note: To create a new asset property data, you should use the classes defined in the // com.amazonaws.greengrass.streammanager.model.sitewise package. List<AssetPropertyValue> entries = new ArrayList<>() ; // IoTSiteWise requires unique timestamps in all messages. Add some randomness to time and offset. final int maxTimeRandomness = 60; final int maxOffsetRandomness = 10000; double randomValue = rand.nextDouble(); TimeInNanos timestamp = new TimeInNanos() .withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness)) .withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness))); AssetPropertyValue entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); entries.add(entry); PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(UUID.randomUUID().toString()) .withPropertyAlias("PropertyAlias") .withPropertyValues(entries); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (StreamManagerException e) { // Properly handle exception. }

Java 軟體開發套件參考:appendMessage|PutAssetPropertyValueEntry

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const maxTimeRandomness = 60; const maxOffsetRandomness = 10000; const randomValue = Math.random(); // Note: To create a new asset property data, you should use the classes defined in the // aws-greengrass-core-sdk StreamManager module. const timestamp = new TimeInNanos() .withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness)) .withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness)); const entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(`${ENTRY_ID_PREFIX}${i}`) .withPropertyAlias("PropertyAlias") .withPropertyValues([entry]); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 軟體開發套件參考:appendMessage|PutAssetPropertyValueEntry

Amazon S3 匯出目的地

下列程式碼片段附加一個匯出任務到名為StreamName。對於 Amazon S3 目標,您的 Lambda 函數會附加一個序列化的S3ExportTaskDefinition物件,其中包含有關源輸入文件和目標 Amazon S3 物件的資訊。如果指定的對象不存在,流管理器將為您創建它。如需詳細資訊,請參閱 匯出至 Amazon S3

此代碼段有以下要求:

  • 下限AWS IoT Greengrass核心版本:1.11.0

  • 下限AWS IoT Greengrass核心開發套件版本:Python:Java 1.6.0 | Java:Node.js:1.7.0

Python
client = StreamManagerClient() try: # Append an Amazon S3 Task definition and print the sequence number. s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName") sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 軟體開發套件參考:附加訊息|S3 導出任務定義

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { // Append an Amazon S3 export task definition and print the sequence number. S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition)); } catch (StreamManagerException e) { // Properly handle exception. }

Java 軟體開發套件參考:appendMessage|S3 導出任務定義

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { // Append an Amazon S3 export task definition and print the sequence number. const taskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition))); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 軟體開發套件參考:appendMessage|S3 導出任務定義

 

讀取訊息

從串流讀取訊息。

要求

此操作有下列需求:

  • 下限AWS IoT Greengrass核心版本:1.10.0

  • 下限AWS IoT Greengrass核心開發套件版本:Python:Java 1.5.0 版:Node.js:1.6.0

範例

下列程式碼片段可從名為 StreamName 的串流讀取訊息。讀取方法需要一個選用的 ReadMessagesOptions 物件以指定序號,從要讀取的最小、最大數字和讀取訊息的逾時開始讀取。

Python
client = StreamManagerClient() try: message_list = client.read_messages( stream_name="StreamName", # By default, if no options are specified, it tries to read one message from the beginning of the stream. options=ReadMessagesOptions( desired_start_sequence_number=100, # Try to read from sequence number 100 or greater. By default, this is 0. min_message_count=10, # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. max_message_count=100, # Accept up to 100 messages. By default this is 1. read_timeout_millis=5000 # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. ) ) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 軟體開發套件參考:讀取消息|ReadMessagesOptions

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<Message> messages = client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100L) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. .withMinMessageCount(10L) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100L) // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis()) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java 軟體開發套件參考:readMessages|ReadMessagesOptions

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messages = await client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1. .withMinMessageCount(10) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100) // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(5 * 1000) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 軟體開發套件參考:readMessages|ReadMessagesOptions

 

列出串流

獲取串流管理器中的串流清單。

要求

此操作有下列需求:

  • 下限AWS IoT Greengrass核心版本:1.10.0

  • 下限AWS IoT Greengrass核心開發套件版本:Python:Java 版本:Node.js:1.6.0

範例

下列程式碼片段可獲取串流管理員中的串流清單 (依據名稱)。

Python
client = StreamManagerClient() try: stream_names = client.list_streams() except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 軟體開發套件參考:列表串流

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<String> streamNames = client.listStreams(); } catch (StreamManagerException e) { // Properly handle exception. }

Java 軟體開發套件參考:listStreams

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const streams = await client.listStreams(); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 軟體開發套件參考:listStreams

 

描述訊息串流

獲取串流相關的元數據,包括串流定義、大小和匯出狀態。

要求

此操作有下列需求:

  • 下限AWS IoT Greengrass核心版本:1.10.0

  • 下限AWS IoT Greengrass核心開發套件版本:Python:Java 版本:Node.js:1.6.0

範例

下列程式碼片段可獲取名為 StreamName 的串流相關的中繼資料,包括串流定義、大小和匯出工具狀態。

Python
client = StreamManagerClient() try: stream_description = client.describe_message_stream(stream_name="StreamName") if stream_description.export_statuses[0].error_message: # The last export of export destination 0 failed with some error # Here is the last sequence number that was successfully exported stream_description.export_statuses[0].last_exported_sequence_number if (stream_description.storage_status.newest_sequence_number > stream_description.export_statuses[0].last_exported_sequence_number): pass # The end of the stream is ahead of the last exported sequence number except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 軟體開發套件參考:描述 _ 訊息串流

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo description = client.describeMessageStream("StreamName"); String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage(); if (lastErrorMessage != null && !lastErrorMessage.equals("")) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.getExportStatuses().get(0).getLastExportedSequenceNumber(); } if (description.getStorageStatus().getNewestSequenceNumber() > description.getExportStatuses().get(0).getLastExportedSequenceNumber()) { // The end of the stream is ahead of the last exported sequence number. } } catch (StreamManagerException e) { // Properly handle exception. }

Java 軟體開發套件參考:describeMessageStream 面

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const description = await client.describeMessageStream("StreamName"); const lastErrorMessage = description.exportStatuses[0].errorMessage; if (lastErrorMessage) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.exportStatuses[0].lastExportedSequenceNumber; } if (description.storageStatus.newestSequenceNumber > description.exportStatuses[0].lastExportedSequenceNumber) { // The end of the stream is ahead of the last exported sequence number. } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 軟體開發套件參考:describeMessageStream 面

 

更新訊息串流

更新現有串流的屬性。如果您的需求在創建流後發生變化,則可能需要更新流。例如:

  • 新增一個新的匯出配置對於AWS 雲端目的地。

  • 增加流的最大大小以更改導出或保留數據的方式。例如,流大小與您的完整設置策略結合使用可能會導致數據被刪除或拒絕,然後流管理器才能處理它。

  • 暫停和恢復導出;例如,如果導出任務運行時間長,並且您想要配給上載數據。

您的 Lambda 函數遵循以下高級過程來更新流:

  1. 獲取串流的描述。

  2. 更新對應的MessageStreamDefinition和從屬對象。

  3. 傳入更新的MessageStreamDefinition。確保包含已更新流的完整對象定義。未定義的屬性還原為預設值。

    您可以指定要在導出過程中用作起始消息的消息的序列號。

要求

此操作有下列需求:

  • 下限AWS IoT Greengrass核心版本:1.11.0

  • 下限AWS IoT Greengrass核心開發套件版本:Python:Java 1.6.0 | Java:Node.js:1.7.0

範例

下列程式碼片段會更新名為StreamName。它更新導出到 Kinesis Data Streams 的多個屬性。

Python
client = StreamManagerClient() try: message_stream_info = client.describe_message_stream(STREAM_NAME) message_stream_info.definition.max_size=536870912 message_stream_info.definition.stream_segment_size=33554432 message_stream_info.definition.time_to_live_millis=3600000 message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData message_stream_info.definition.persistence=Persistence.Memory message_stream_info.definition.flush_on_write=False message_stream_info.definition.export_definition.kinesis= [KinesisConfig( # Updating Export definition to add a Kinesis Stream configuration. identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))] client.update_message_stream(message_stream_info.definition) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 軟體開發套件參考:更新信息流程|MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME); // Update the message stream with new values. client.updateMessageStream( messageStreamInfo.getDefinition() .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912L) // Update Max Size to 512 MB. .withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB. .withFlushOnWrite(true) // Update flush on write to true. .withPersistence(Persistence.Memory) // Update the persistence to Memory. .withTimeToLiveMillis(3600000L) // Update TTL to 1 hour. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS 雲端. messageStreamInfo.getDefinition().getExportDefinition(). // Updating Export definition to add a Kinesis Stream configuration. .withKinesis(new ArrayList<KinesisConfig>() {{ add(new KinesisConfig() .withIdentifier(EXPORT_IDENTIFIER) .withKinesisStreamName("test")); }}) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java 軟體開發套件參考:更新消息流|MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messageStreamInfo = await c.describeMessageStream(STREAM_NAME); await client.updateMessageStream( messageStreamInfo.definition // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB. .withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB. .withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour. .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. .withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory .withFlushOnWrite(true) // Default is false. Updating to true. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS 雲端. messageStreamInfo.definition.exportDefinition // Updating Export definition to add a Kinesis Stream configuration. .withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())]) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 軟體開發套件參考:更新信息流程|MessageStreamDefinition

更新流的約束

更新串流時有下列限制。除非在以下列表中註明,否則更新將立即生效。

  • 您無法更新串流的持久性。若要變更此行為,刪除串流建立串流,它定義了新的持久性策略。

  • 只有在下列條件下才能更新串流的最大大小:

    • 最大大小必須大於或等於串流的當前大小。要查找此信息,描述串流,然後檢查返回的MessageStreamInfo物件。

    • 最大大小必須大於或等於串流的段大小。

  • 您可以將流段大小更新為小於流最大大小的值。更新後的設置將應用於新段。

  • 存留時間 (TL) 屬性的更新適用於新的附加操作。如果減小此值,流管理器還可能會刪除超過 TTL 的現有段。

  • 對完整屬性策略的更新應用於新的追加操作。如果將策略設置為覆蓋最舊的數據,則流管理器還可能會根據新設置覆蓋現有數據段。

  • 對寫入時刷新屬性的更新應用於新郵件。

  • 導出配置的更新應用於新的導出。更新請求必須包括要支持的所有導出配置。否則,流管理器將刪除它們。

    • 更新導出配置時,請指定目標導出配置的標識符。

    • 要添加導出配置,請為新導出配置指定唯一標識符。

    • 要刪除導出配置,請省略導出配置。

  • 若要更新流中導出配置的起始序列號,則必須指定一個小於最新序列號的值。要查找此信息,描述串流,然後檢查返回的MessageStreamInfo物件。

 

刪除訊息串流

刪除串流。刪除串流時,磁碟中該串流的所有儲存資料都會刪除。

要求

此操作有下列需求:

  • 下限AWS IoT Greengrass核心版本:1.10.0

  • 下限AWS IoT Greengrass核心開發套件版本:Python:Java:1.4.0 | Node.js:1.6.0

範例

下列程式碼片段會刪除名為 StreamName 的串流。

Python
client = StreamManagerClient() try: client.delete_message_stream(stream_name="StreamName") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python 軟體開發套件參考:deleteMessageStream

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.deleteMessageStream("StreamName"); } catch (StreamManagerException e) { // Properly handle exception. }

Java 軟體開發套件參考:刪除訊息串流

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.deleteMessageStream("StreamName"); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js 軟體開發套件參考:deleteMessageStream

另請參閱