本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
在 Greengrass 核心裝置上執行的使用者定義 Greengrass 元件可以使用 Stream Manager SDK 中的 StreamManagerClient
物件,在串流管理員中建立串流,然後與串流互動。當元件建立串流時,它會定義串流的 AWS 雲端 目的地、優先順序和其他匯出和資料保留政策。若要將資料傳送至串流管理員,元件會將資料附加至串流。如果已定義串流的匯出目的地,串流管理員會自動匯出串流。
注意
一般而言,串流管理員的用戶端是使用者定義的 Greengrass 元件。如果您的商業案例需要,您也可以允許在 Greengrass 核心 (例如 Docker 容器) 上執行的非元件程序與串流管理員互動。如需詳細資訊,請參閱用戶端身分驗證。
本主題中的程式碼片段會示範用戶端如何呼叫StreamManagerClient
方法以使用串流。如需方法及其引數的實作詳細資訊,請使用每個程式碼片段之後列出的 SDK 參考連結。
如果您在 Lambda 函數中使用串流管理員,您的 Lambda 函數應該在函數處理常式StreamManagerClient
之外執行個體化。如果在處理常式內執行個體化,則該函數在每次叫用時都會建立 client
以及與串流管理員的連線。
注意
如果您在處理常式內將 StreamManagerClient
執行個體化,則必須在 client
完成其工作時明確呼叫 close()
方法。否則,client
會將連線保持在開啟狀態,並將另一個執行緒保持在執行狀態,直到指令碼結束為止。
StreamManagerClient
支援下列操作:
建立訊息串流
若要建立串流,使用者定義的 Greengrass 元件會呼叫建立方法,並在MessageStreamDefinition
物件中傳遞。此物件會指定串流的唯一名稱,並定義串流管理員在達到串流大小上限時應如何處理新資料。您可以使用 MessageStreamDefinition
及其資料類型 (例如 ExportDefinition
、StrategyOnFull
和 Persistence
) 來定義其他串流屬性。其中包含:
-
自動匯出的目標 AWS IoT SiteWise、 AWS IoT Analytics Kinesis Data Streams 和 Amazon S3 目的地。如需詳細資訊,請參閱匯出支援 AWS 雲端 目的地的組態。
-
匯出優先順序。串流管理員會先匯出優先順序較高的串流,然後再匯出較低的串流。
-
AWS IoT Analytics、Kinesis Data Streams 和 AWS IoT SiteWise 目的地的批次大小上限和批次間隔。符合任一條件時,串流管理員會匯出訊息。
-
存留時間 (TTL)。保證串流資料可供處理的時間量。您應該確定資料可以在這段期間內使用。這不是刪除政策。TTL 期間後,資料可能不會立即刪除。
-
串流持久性。選擇此選項可將串流儲存至檔案系統,以便在核心重新啟動期間保留資料,或將串流儲存在記憶體中。
-
起始序號。指定要用作匯出中起始訊息的訊息序號。
如需 的詳細資訊MessageStreamDefinition
,請參閱目標語言的 SDK 參考:
-
Java 開發套件中的 MessageStreamDefinition
-
Node.js SDK 中的 MessageStreamDefinition
-
Python SDK 中的 MessageStreamDefinition
注意
StreamManagerClient
也提供目標目的地,可用來將串流匯出至 HTTP 伺服器。此目標僅供測試之用。它不穩定或不支援在生產環境中使用。
建立串流後,您的 Greengrass 元件可以將訊息附加到串流,以傳送資料從串流匯出和讀取訊息以進行本機處理。您建立的串流數量取決於您的硬體功能和商業案例。其中一個策略是為 AWS IoT Analytics 或 Kinesis 資料串流中的每個目標頻道建立串流,但您可以定義串流的多個目標。串流的生命週期相當耐久。
要求
此操作有下列需求:
-
最低串流管理員 SDK 版本:Python:1.1.0 | Java:1.1.0 | Node.js:1.1.0
範例
以下程式碼片段會建立名為 StreamName
的串流。它定義 MessageStreamDefinition
和次級資料類型中的串流屬性。
client = StreamManagerClient()
try:
client.create_message_stream(MessageStreamDefinition(
name="StreamName", # Required.
max_size=268435456, # Default is 256 MB.
stream_segment_size=16777216, # Default is 16 MB.
time_to_live_millis=None, # By default, no TTL is enabled.
strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required.
persistence=Persistence.File, # Default is File.
flush_on_write=False, # Default is false.
export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the AWS 雲端.
kinesis=None,
iot_analytics=None,
iot_sitewise=None,
s3_task_executor=None
)
))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 參考:create_message_stream
如需設定匯出目的地的詳細資訊,請參閱 匯出支援 AWS 雲端 目的地的組態。
附加訊息
若要將資料傳送至串流管理員以進行匯出,您的 Greengrass 元件會將資料附加至目標串流。匯出目的地會決定要傳遞至此方法的資料類型。
要求
此操作有下列需求:
-
最低串流管理員 SDK 版本:Python:1.1.0 | Java:1.1.0 | Node.js:1.1.0
範例
AWS IoT Analytics 或 Kinesis Data Streams 匯出目的地
下列程式碼片段附加一個訊息到名為 StreamName
的串流。對於 AWS IoT Analytics 或 Kinesis Data Streams 目的地,您的 Greengrass 元件會附加資料的 Blob。
此程式碼片段有下列需求:
-
最低串流管理員 SDK 版本:Python:1.1.0 | Java:1.1.0 | Node.js:1.1.0
client = StreamManagerClient()
try:
sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data')
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 參考:end_message
AWS IoT SiteWise 匯出目的地
下列程式碼片段附加一個訊息到名為 StreamName
的串流。對於 AWS IoT SiteWise 目的地,您的 Greengrass 元件會附加序列化PutAssetPropertyValueEntry
物件。如需詳細資訊,請參閱匯出至 AWS IoT SiteWise。
注意
當您傳送資料至 時 AWS IoT SiteWise,您的資料必須符合 BatchPutAssetPropertyValue
動作的要求。如需詳細資訊,請參閱《AWS IoT SiteWise API 參考》中的 BatchPutAssetPropertyValue。
此程式碼片段有下列需求:
-
最低串流管理員 SDK 版本:Python:1.1.0 | Java:1.1.0 | Node.js:1.1.0
client = StreamManagerClient() try: # SiteWise requires unique timestamps in all messages and also needs timestamps not earlier # than 10 minutes in the past. Add some randomness to time and offset. # Note: To create a new asset property data, you should use the classes defined in the # greengrasssdk.stream_manager module. time_in_nanos = TimeInNanos( time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) ) variant = Variant(double_value=random.random()) asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset) sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.
Python SDK 參考:end_message
Amazon S3 匯出目的地
下列程式碼片段會將匯出任務附加至名為 的串流StreamName
。對於 Amazon S3 目的地,您的 Greengrass 元件會附加序列化S3ExportTaskDefinition
物件,其中包含來源輸入檔案和目標 Amazon S3 物件的相關資訊。如果指定的物件不存在,串流管理員會為您建立它。如需詳細資訊,請參閱匯出至 Amazon S3。
此程式碼片段有下列需求:
-
最低串流管理員 SDK 版本:Python:1.1.0 | Java:1.1.0 | Node.js:1.1.0
client = StreamManagerClient() try: # Append an Amazon S3 Task definition and print the sequence number. s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName") sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.
Python SDK 參考:end_message
讀取訊息
從串流讀取訊息。
要求
此操作有下列需求:
-
最低串流管理員 SDK 版本:Python:1.1.0 | Java:1.1.0 | Node.js:1.1.0
範例
下列程式碼片段可從名為 StreamName
的串流讀取訊息。讀取方法需要一個選用的 ReadMessagesOptions
物件以指定序號,從要讀取的最小、最大數字和讀取訊息的逾時開始讀取。
client = StreamManagerClient()
try:
message_list = client.read_messages(
stream_name="StreamName",
# By default, if no options are specified, it tries to read one message from the beginning of the stream.
options=ReadMessagesOptions(
desired_start_sequence_number=100,
# Try to read from sequence number 100 or greater. By default, this is 0.
min_message_count=10,
# Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
max_message_count=100, # Accept up to 100 messages. By default this is 1.
read_timeout_millis=5000
# Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
)
)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 參考:Read_messages
列出串流
取得串流管理員中的串流清單。
要求
此操作有下列需求:
-
最低串流管理員 SDK 版本:Python:1.1.0 | Java:1.1.0 | Node.js:1.1.0
範例
下列程式碼片段可獲取串流管理員中的串流清單 (依據名稱)。
client = StreamManagerClient()
try:
stream_names = client.list_streams()
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 參考:list_streams
描述訊息串流
取得串流的中繼資料,包括串流定義、大小和匯出狀態。
要求
此操作有下列需求:
-
最低串流管理員 SDK 版本:Python:1.1.0 | Java:1.1.0 | Node.js:1.1.0
範例
下列程式碼片段可獲取名為 StreamName
的串流相關的中繼資料,包括串流定義、大小和匯出工具狀態。
client = StreamManagerClient()
try:
stream_description = client.describe_message_stream(stream_name="StreamName")
if stream_description.export_statuses[0].error_message:
# The last export of export destination 0 failed with some error
# Here is the last sequence number that was successfully exported
stream_description.export_statuses[0].last_exported_sequence_number
if (stream_description.storage_status.newest_sequence_number >
stream_description.export_statuses[0].last_exported_sequence_number):
pass
# The end of the stream is ahead of the last exported sequence number
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 參考: describe_message_stream
更新訊息串流
更新現有串流的屬性。如果您的需求在建立串流之後變更,建議您更新串流。例如:
-
新增 AWS 雲端 目的地的新匯出組態。
-
增加串流的大小上限,以變更資料匯出或保留的方式。例如,串流大小結合您在完整設定上的策略,可能會導致資料遭到刪除或拒絕,然後串流管理員才能處理資料。
-
暫停和繼續匯出;例如,如果匯出任務長時間執行,而且您想要對上傳資料進行調整。
您的 Greengrass 元件遵循此高階程序來更新串流:
-
更新對應
MessageStreamDefinition
和次級物件的目標屬性。 -
在更新的 中傳遞
MessageStreamDefinition
。請務必包含更新串流的完整物件定義。未定義的屬性會還原為預設值。您可以指定要用作匯出中起始訊息的訊息序號。
要求
此操作有下列需求:
-
最低串流管理員 SDK 版本:Python:1.1.0 | Java:1.1.0 | Node.js:1.1.0
範例
下列程式碼片段會更新名為 的串流StreamName
。它會更新匯出至 Kinesis Data Streams 之串流的多個屬性。
client = StreamManagerClient()
try:
message_stream_info = client.describe_message_stream(STREAM_NAME)
message_stream_info.definition.max_size=536870912
message_stream_info.definition.stream_segment_size=33554432
message_stream_info.definition.time_to_live_millis=3600000
message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData
message_stream_info.definition.persistence=Persistence.Memory
message_stream_info.definition.flush_on_write=False
message_stream_info.definition.export_definition.kinesis=
[KinesisConfig(
# Updating Export definition to add a Kinesis Stream configuration.
identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))]
client.update_message_stream(message_stream_info.definition)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 參考:updateMessageStream
更新串流的限制
更新串流時,適用下列限制條件。除非下列清單中另有說明,否則更新會立即生效。
-
您無法更新串流的持久性。若要變更此行為,請刪除串流並建立定義新持久性政策的串流。
-
您只能在下列情況更新串流的大小上限:
-
大小上限必須大於或等於串流的目前大小。若要尋找此資訊,請描述串流,然後檢查傳回
MessageStreamInfo
物件的儲存狀態。 -
大小上限必須大於或等於串流的區段大小。
-
-
您可以將串流區段大小更新為小於串流大小上限的值。更新的設定會套用至新的客群。
-
存留時間 (TTL) 屬性的更新適用於新的附加操作。如果您降低此值,串流管理員也可能會刪除超過 TTL 的現有區段。
-
完整屬性策略的更新適用於新的附加操作。如果您設定策略來覆寫最舊的資料,串流管理員也可能會根據新設定覆寫現有的區段。
-
對寫入屬性排清的更新會套用至新訊息。
-
匯出組態的更新適用於新的匯出。更新請求必須包含您要支援的所有匯出組態。否則,串流管理員會刪除它們。
-
當您更新匯出組態時,請指定目標匯出組態的識別符。
-
若要新增匯出組態,請指定新匯出組態的唯一識別符。
-
若要刪除匯出組態,請省略匯出組態。
-
-
若要更新串流中匯出組態的起始序號,您必須指定小於最新序號的值。若要尋找此資訊,請描述串流,然後檢查傳回
MessageStreamInfo
物件的儲存狀態。
刪除訊息串流
刪除串流。刪除串流時,磁碟中該串流的所有儲存資料都會刪除。
要求
此操作有下列需求:
-
最低串流管理員 SDK 版本:Python:1.1.0 | Java:1.1.0 | Node.js:1.1.0
範例
下列程式碼片段會刪除名為 StreamName
的串流。
client = StreamManagerClient()
try:
client.delete_message_stream(stream_name="StreamName")
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python SDK 參考:deleteMessageStream
另請參閱
-
StreamManagerClient
串流管理員 SDK 參考中的 :