AWS IoT Greengrass Version 1 於 2023 年 6 月 30 日進入延長壽命階段。如需詳細資訊,請參閱 AWS IoT Greengrass V1 維護政策 。在此日期之後, AWS IoT Greengrass V1 不會發佈提供功能、增強功能、錯誤修正或安全修補程式的更新。在 上執行的裝置 AWS IoT Greengrass V1 不會中斷,並會繼續運作和連線至雲端。我們強烈建議您遷移至 AWS IoT Greengrass Version 2,這會新增重要的新功能,並支援其他平台 。
本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 StreamManagerClient 使用串流
用户定義的 Lambda 函數運行在AWS IoT Greengrass核心可以使用StreamManagerClient
在AWS IoT Greengrass核心開發套件創建流串流管理員,然後與流進行交互。當 Lambda 函數創建一個流時,它會定義AWS 雲端串流的目的地、優先順序以及其他匯出和資料保留政策。要將數據發送到流管理器,Lambda 函數會將數據追加到流中。如果已為串流定義匯出目的地,串流管理員會自動匯出串流。
串流管理員的客户端通常是用户定義的 Lambda 函數。如果您的商業案例需要,則還可以允許在 Greengrass 核心上執行的非 LambDA 程序 (例如 Docker 容器) 與串流管理員互動。如需詳細資訊,請參閱 用戶端身分驗證。
本主題中的程式碼片段顯示客户端如何在StreamManagerClient
方法來處理串流。如需有關方法及其參數的實現詳細資訊,請使用每個程式碼片段後列出的 SDK 引用的鏈接。有關包含完整 Python Lambda 函數的教程,請參閲將資料串流匯出至AWS 雲端(console)或者匯出資料串流AWS 雲端(CLI)。
你的 Lambda 函數應該實例化StreamManagerClient
在函數處理程序之外。如果在處理常式內執行個體化,則該函數在每次叫用時都會建立 client
以及與串流管理員的連線。
如果您在處理常式內將 StreamManagerClient
執行個體化,則必須在 client
完成其工作時明確呼叫 close()
方法。否則,client
會將連線保持在開啟狀態,並將另一個執行緒保持在執行狀態,直到指令碼結束為止。
StreamManagerClient
支援下列操作:
建立訊息串流
要創建一個流,用户定義的 Lambda 函數調用 create 方法並傳遞MessageStreamDefinition
物件。此物件指定串流的唯一名稱,並定義串流管理員在串流達上限時應如何處理新資料。您可以使用 MessageStreamDefinition
及其資料類型 (例如 ExportDefinition
、StrategyOnFull
和 Persistence
) 來定義其他串流屬性。其中包含:
-
目標AWS IoT Analytics、Kinesis Data Streams、AWS IoT SiteWise以及用於自動導出的 Amazon S3 目標。如需詳細資訊,請參閱 導出支持的配置AWS 雲端目的地。
-
匯出優先順序。串流管理員會先匯出優先順序較高的串流,然後再匯出較低的串流。
-
最大批次大小和批次間隔AWS IoT Analytics、Kinesis Data StreamsAWS IoT SiteWise目的地。符合任一條件時,串流管理員會匯出訊息。
-
存留時間 (TTL)。保證串流資料可供處理的時間量。您應該確定資料可以在這段期間內使用。這不是刪除政策。TTL 期間後,資料可能不會立即刪除。
-
串流持久性。選擇此選項可將串流儲存至檔案系統,以便在核心重新啟動期間保留資料,或將串流儲存在記憶體中。
-
起始序號。指定要在導出過程中用作起始消息的消息序列號。
如需有關 的詳細資訊MessageStreamDefinition
,請參您目標語言的軟體開發套件參考:
StreamManagerClient
還提供了一個目標,您可以用來將串流匯出至 HTTP 伺服器。此目標僅供測試之用。此不穩定,也不支援用於生產環境中。
建立串流後,您的 Lambda 函數可以附加訊息到流以發送數據以進行導出,然後讀取訊息從流進行本地處理。您建立的串流數量取決於您的硬體功能和商業案例。其中一項策略是在AWS IoT Analytics或 Kinesis 資料串流,儘管您可以為串流定義多個目標。串流的生命週期相當耐久。
要求
此操作有下列需求:
使用AWS IoT SiteWise或 Amazon S3 匯出目的地有下列要求:
範例
以下程式碼片段會建立名為 StreamName
的串流。它定義了MessageStreamDefinition
和從屬數據類型。
- Python
-
client = StreamManagerClient()
try:
client.create_message_stream(MessageStreamDefinition(
name="StreamName", # Required.
max_size=268435456, # Default is 256 MB.
stream_segment_size=16777216, # Default is 16 MB.
time_to_live_millis=None, # By default, no TTL is enabled.
strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required.
persistence=Persistence.File, # Default is File.
flush_on_write=False, # Default is false.
export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the AWS 雲端.
kinesis=None,
iot_analytics=None,
iot_sitewise=None,
s3_task_executor=None
)
))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python 軟體開發套件參考:創建訊息串流|MessageStreamDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
client.createMessageStream(
new MessageStreamDefinition()
.withName("StreamName") // Required.
.withMaxSize(268435456L) // Default is 256 MB.
.withStreamSegmentSize(16777216L) // Default is 16 MB.
.withTimeToLiveMillis(null) // By default, no TTL is enabled.
.withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required.
.withPersistence(Persistence.File) // Default is File.
.withFlushOnWrite(false) // Default is false.
.withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS 雲端.
new ExportDefinition()
.withKinesis(null)
.withIotAnalytics(null)
.withIotSitewise(null)
.withS3TaskExecutor(null)
)
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java 軟體開發套件參考:createMessageStream|MessageStreamDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
await client.createMessageStream(
new MessageStreamDefinition()
.withName("StreamName") // Required.
.withMaxSize(268435456) // Default is 256 MB.
.withStreamSegmentSize(16777216) // Default is 16 MB.
.withTimeToLiveMillis(null) // By default, no TTL is enabled.
.withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required.
.withPersistence(Persistence.File) // Default is File.
.withFlushOnWrite(false) // Default is false.
.withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS 雲端.
new ExportDefinition()
.withKinesis(null)
.withIotAnalytics(null)
.withIotSitewise(null)
.withS3TaskExecutor(null)
)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js 軟體開發套件參考:createMessageStream|MessageStreamDefinition
如需設定匯出目的地的詳細資訊,請參導出支持的配置AWS 雲端目的地。
附加訊息
若要將資料發送到串流管理員以便匯出,您的 Lambda 函數會將資料附加到目標串流。導出目標決定傳遞給此方法的數據類型。
要求
此操作有下列需求:
將消息附加到AWS IoT SiteWise或 Amazon S3 匯出目的地有下列要求:
範例
AWS IoT Analytics或 Kinesis Data Streams 導出目的地
下列程式碼片段附加一個訊息到名為 StreamName
的串流。適用於AWS IoT Analytics或 Kinesis Data Streams 目標時,您的 Lambda 函數會附加一個數據 Blob。
此代碼段有以下要求:
- Python
-
client = StreamManagerClient()
try:
sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data')
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python 軟體開發套件參考:附加訊息
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes());
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java 軟體開發套件參考:appendMessage
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array"));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js 軟體開發套件參考:appendMessage
AWS IoT SiteWise匯出目的地
下列程式碼片段附加一個訊息到名為 StreamName
的串流。適用於AWS IoT SiteWise目標時,您的 Lambda 函數會附加一個序列化的PutAssetPropertyValueEntry
物件。如需詳細資訊,請參閱 匯出至AWS IoT SiteWise。
此代碼段有以下要求:
- Python
-
client = StreamManagerClient()
try:
# SiteWise requires unique timestamps in all messages. Add some randomness to time and offset.
# Note: To create a new asset property data, you should use the classes defined in the
# greengrasssdk.stream_manager module.
time_in_nanos = TimeInNanos(
time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
)
variant = Variant(double_value=random.random())
asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset)
sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python 軟體開發套件參考:附加訊息|PutAssetPropertyValueEntry
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
Random rand = new Random();
// Note: To create a new asset property data, you should use the classes defined in the
// com.amazonaws.greengrass.streammanager.model.sitewise package.
List<AssetPropertyValue> entries = new ArrayList<>() ;
// IoTSiteWise requires unique timestamps in all messages. Add some randomness to time and offset.
final int maxTimeRandomness = 60;
final int maxOffsetRandomness = 10000;
double randomValue = rand.nextDouble();
TimeInNanos timestamp = new TimeInNanos()
.withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness))
.withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness)));
AssetPropertyValue entry = new AssetPropertyValue()
.withValue(new Variant().withDoubleValue(randomValue))
.withQuality(Quality.GOOD)
.withTimestamp(timestamp);
entries.add(entry);
PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
.withEntryId(UUID.randomUUID().toString())
.withPropertyAlias("PropertyAlias")
.withPropertyValues(entries);
long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java 軟體開發套件參考:appendMessage|PutAssetPropertyValueEntry
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const maxTimeRandomness = 60;
const maxOffsetRandomness = 10000;
const randomValue = Math.random();
// Note: To create a new asset property data, you should use the classes defined in the
// aws-greengrass-core-sdk StreamManager module.
const timestamp = new TimeInNanos()
.withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness))
.withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness));
const entry = new AssetPropertyValue()
.withValue(new Variant().withDoubleValue(randomValue))
.withQuality(Quality.GOOD)
.withTimestamp(timestamp);
const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
.withEntryId(`${ENTRY_ID_PREFIX}${i}`)
.withPropertyAlias("PropertyAlias")
.withPropertyValues([entry]);
const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js 軟體開發套件參考:appendMessage|PutAssetPropertyValueEntry
Amazon S3 匯出目的地
下列程式碼片段附加一個匯出任務到名為StreamName
。對於 Amazon S3 目標,您的 Lambda 函數會附加一個序列化的S3ExportTaskDefinition
物件,其中包含有關源輸入文件和目標 Amazon S3 物件的資訊。如果指定的對象不存在,流管理器將為您創建它。如需詳細資訊,請參閱 匯出至 Amazon S3。
此代碼段有以下要求:
- Python
-
client = StreamManagerClient()
try:
# Append an Amazon S3 Task definition and print the sequence number.
s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName")
sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python 軟體開發套件參考:附加訊息|S3 導出任務定義
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
// Append an Amazon S3 export task definition and print the sequence number.
S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition()
.withBucket("BucketName")
.withKey("KeyName")
.withInputUrl("URLToFile");
long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition));
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java 軟體開發套件參考:appendMessage|S3 導出任務定義
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
// Append an Amazon S3 export task definition and print the sequence number.
const taskDefinition = new S3ExportTaskDefinition()
.withBucket("BucketName")
.withKey("KeyName")
.withInputUrl("URLToFile");
const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition)));
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js 軟體開發套件參考:appendMessage|S3 導出任務定義
讀取訊息
從串流讀取訊息。
要求
此操作有下列需求:
範例
下列程式碼片段可從名為 StreamName
的串流讀取訊息。讀取方法需要一個選用的 ReadMessagesOptions
物件以指定序號,從要讀取的最小、最大數字和讀取訊息的逾時開始讀取。
- Python
-
client = StreamManagerClient()
try:
message_list = client.read_messages(
stream_name="StreamName",
# By default, if no options are specified, it tries to read one message from the beginning of the stream.
options=ReadMessagesOptions(
desired_start_sequence_number=100,
# Try to read from sequence number 100 or greater. By default, this is 0.
min_message_count=10,
# Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
max_message_count=100, # Accept up to 100 messages. By default this is 1.
read_timeout_millis=5000
# Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
)
)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python 軟體開發套件參考:讀取消息|ReadMessagesOptions
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
List<Message> messages = client.readMessages("StreamName",
// By default, if no options are specified, it tries to read one message from the beginning of the stream.
new ReadMessagesOptions()
// Try to read from sequence number 100 or greater. By default this is 0.
.withDesiredStartSequenceNumber(100L)
// Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
.withMinMessageCount(10L)
// Accept up to 100 messages. By default this is 1.
.withMaxMessageCount(100L)
// Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
.withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis())
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java 軟體開發套件參考:readMessages|ReadMessagesOptions
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const messages = await client.readMessages("StreamName",
// By default, if no options are specified, it tries to read one message from the beginning of the stream.
new ReadMessagesOptions()
// Try to read from sequence number 100 or greater. By default this is 0.
.withDesiredStartSequenceNumber(100)
// Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1.
.withMinMessageCount(10)
// Accept up to 100 messages. By default this is 1.
.withMaxMessageCount(100)
// Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
.withReadTimeoutMillis(5 * 1000)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js 軟體開發套件參考:readMessages|ReadMessagesOptions
列出串流
獲取串流管理器中的串流清單。
要求
此操作有下列需求:
範例
下列程式碼片段可獲取串流管理員中的串流清單 (依據名稱)。
- Python
-
client = StreamManagerClient()
try:
stream_names = client.list_streams()
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python 軟體開發套件參考:列表串流
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
List<String> streamNames = client.listStreams();
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java 軟體開發套件參考:listStreams
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const streams = await client.listStreams();
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js 軟體開發套件參考:listStreams
描述訊息串流
獲取串流相關的元數據,包括串流定義、大小和匯出狀態。
要求
此操作有下列需求:
範例
下列程式碼片段可獲取名為 StreamName
的串流相關的中繼資料,包括串流定義、大小和匯出工具狀態。
- Python
-
client = StreamManagerClient()
try:
stream_description = client.describe_message_stream(stream_name="StreamName")
if stream_description.export_statuses[0].error_message:
# The last export of export destination 0 failed with some error
# Here is the last sequence number that was successfully exported
stream_description.export_statuses[0].last_exported_sequence_number
if (stream_description.storage_status.newest_sequence_number >
stream_description.export_statuses[0].last_exported_sequence_number):
pass
# The end of the stream is ahead of the last exported sequence number
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python 軟體開發套件參考:描述 _ 訊息串流
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
MessageStreamInfo description = client.describeMessageStream("StreamName");
String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage();
if (lastErrorMessage != null && !lastErrorMessage.equals("")) {
// The last export of export destination 0 failed with some error.
// Here is the last sequence number that was successfully exported.
description.getExportStatuses().get(0).getLastExportedSequenceNumber();
}
if (description.getStorageStatus().getNewestSequenceNumber() >
description.getExportStatuses().get(0).getLastExportedSequenceNumber()) {
// The end of the stream is ahead of the last exported sequence number.
}
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java 軟體開發套件參考:describeMessageStream 面
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const description = await client.describeMessageStream("StreamName");
const lastErrorMessage = description.exportStatuses[0].errorMessage;
if (lastErrorMessage) {
// The last export of export destination 0 failed with some error.
// Here is the last sequence number that was successfully exported.
description.exportStatuses[0].lastExportedSequenceNumber;
}
if (description.storageStatus.newestSequenceNumber >
description.exportStatuses[0].lastExportedSequenceNumber) {
// The end of the stream is ahead of the last exported sequence number.
}
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js 軟體開發套件參考:describeMessageStream 面
更新訊息串流
更新現有串流的屬性。如果您的需求在創建流後發生變化,則可能需要更新流。例如:
您的 Lambda 函數遵循以下高級過程來更新流:
-
獲取串流的描述。
-
更新對應的MessageStreamDefinition
和從屬對象。
-
傳入更新的MessageStreamDefinition
。確保包含已更新流的完整對象定義。未定義的屬性還原為預設值。
您可以指定要在導出過程中用作起始消息的消息的序列號。
要求
此操作有下列需求:
範例
下列程式碼片段會更新名為StreamName
。它更新導出到 Kinesis Data Streams 的多個屬性。
- Python
-
client = StreamManagerClient()
try:
message_stream_info = client.describe_message_stream(STREAM_NAME)
message_stream_info.definition.max_size=536870912
message_stream_info.definition.stream_segment_size=33554432
message_stream_info.definition.time_to_live_millis=3600000
message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData
message_stream_info.definition.persistence=Persistence.Memory
message_stream_info.definition.flush_on_write=False
message_stream_info.definition.export_definition.kinesis=
[KinesisConfig(
# Updating Export definition to add a Kinesis Stream configuration.
identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))]
client.update_message_stream(message_stream_info.definition)
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python 軟體開發套件參考:更新信息流程|MessageStreamDefinition
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME);
// Update the message stream with new values.
client.updateMessageStream(
messageStreamInfo.getDefinition()
.withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
// Max Size update should be greater than initial Max Size defined in Create Message Stream request
.withMaxSize(536870912L) // Update Max Size to 512 MB.
.withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB.
.withFlushOnWrite(true) // Update flush on write to true.
.withPersistence(Persistence.Memory) // Update the persistence to Memory.
.withTimeToLiveMillis(3600000L) // Update TTL to 1 hour.
.withExportDefinition(
// Optional. Choose where/how the stream is exported to the AWS 雲端.
messageStreamInfo.getDefinition().getExportDefinition().
// Updating Export definition to add a Kinesis Stream configuration.
.withKinesis(new ArrayList<KinesisConfig>() {{
add(new KinesisConfig()
.withIdentifier(EXPORT_IDENTIFIER)
.withKinesisStreamName("test"));
}})
);
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java 軟體開發套件參考:更新消息流|MessageStreamDefinition
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
const messageStreamInfo = await c.describeMessageStream(STREAM_NAME);
await client.updateMessageStream(
messageStreamInfo.definition
// Max Size update should be greater than initial Max Size defined in Create Message Stream request
.withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB.
.withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB.
.withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour.
.withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
.withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory
.withFlushOnWrite(true) // Default is false. Updating to true.
.withExportDefinition(
// Optional. Choose where/how the stream is exported to the AWS 雲端.
messageStreamInfo.definition.exportDefinition
// Updating Export definition to add a Kinesis Stream configuration.
.withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())])
)
);
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js 軟體開發套件參考:更新信息流程|MessageStreamDefinition
更新流的約束
更新串流時有下列限制。除非在以下列表中註明,否則更新將立即生效。
-
您無法更新串流的持久性。若要變更此行為,刪除串流和建立串流,它定義了新的持久性策略。
-
只有在下列條件下才能更新串流的最大大小:
-
您可以將流段大小更新為小於流最大大小的值。更新後的設置將應用於新段。
-
存留時間 (TL) 屬性的更新適用於新的附加操作。如果減小此值,流管理器還可能會刪除超過 TTL 的現有段。
-
對完整屬性策略的更新應用於新的追加操作。如果將策略設置為覆蓋最舊的數據,則流管理器還可能會根據新設置覆蓋現有數據段。
-
對寫入時刷新屬性的更新應用於新郵件。
-
導出配置的更新應用於新的導出。更新請求必須包括要支持的所有導出配置。否則,流管理器將刪除它們。
-
更新導出配置時,請指定目標導出配置的標識符。
-
要添加導出配置,請為新導出配置指定唯一標識符。
-
要刪除導出配置,請省略導出配置。
-
若要更新流中導出配置的起始序列號,則必須指定一個小於最新序列號的值。要查找此信息,描述串流,然後檢查返回的MessageStreamInfo
物件。
刪除訊息串流
刪除串流。刪除串流時,磁碟中該串流的所有儲存資料都會刪除。
要求
此操作有下列需求:
範例
下列程式碼片段會刪除名為 StreamName
的串流。
- Python
-
client = StreamManagerClient()
try:
client.delete_message_stream(stream_name="StreamName")
except StreamManagerException:
pass
# Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
pass
# Properly handle errors.
Python 軟體開發套件參考:deleteMessageStream
- Java
-
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
client.deleteMessageStream("StreamName");
} catch (StreamManagerException e) {
// Properly handle exception.
}
Java 軟體開發套件參考:刪除訊息串流
- Node.js
-
const client = new StreamManagerClient();
client.onConnected(async () => {
try {
await client.deleteMessageStream("StreamName");
} catch (e) {
// Properly handle errors.
}
});
client.onError((err) => {
// Properly handle connection errors.
// This is called only when the connection to the StreamManager server fails.
});
Node.js 軟體開發套件參考:deleteMessageStream
另請參閱