ストリームを操作するために StreamManagerClient を使用する - AWS IoT Greengrass

AWS IoT Greengrass Version 1 は、2023 年 6 月 30 日に延長ライフフェーズに入りました。詳細については、「AWS IoT Greengrass V1 メンテナンスポリシー」を参照してください。この日以降、 AWS IoT Greengrass V1 は機能、機能強化、バグ修正、またはセキュリティパッチを提供する更新をリリースしません。で実行されるデバイスは中断 AWS IoT Greengrass V1 されず、引き続き運用され、クラウドに接続されます。への移行 AWS IoT Greengrass Version 2を強くお勧めします。これにより、重要な新機能が追加され、追加のプラットフォーム がサポートされます

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

ストリームを操作するために StreamManagerClient を使用する

AWS IoT Greengrass コアで実行されているユーザー定義の Lambda 関数は、AWS IoT Greengrass Core SDKStreamManagerClient オブジェクトを使用して、ストリームマネージャーでストリームを作成し、ストリームと対話できます。Lambda 関数は、ストリームを作成する際に、ストリームの AWS クラウド 送信先、優先順位、その他のエクスポートおよびデータ保持ポリシーを定義します。また、ストリームマネージャーにデータを送信するために、データをストリームに追加します。ストリームにエクスポート先が定義されている場合、ストリームマネージャーは自動的にストリームをエクスポートします。

注記

通常、ストリームマネージャーのクライアントはユーザー定義の Lambda 関数です。ビジネスケースで必要な場合は、Greengrass コア (Docker コンテナなど) で実行されている非 Lambda プロセスがストリームマネージャーと対話できるようにすることもできます。詳細については、「クライアント承認」を参照してください。

このトピックのスニペットは、クライアントが StreamManagerClient メソッドを呼び出してストリームを操作する方法を示しています。メソッドとその引数の実装の詳細については、各スニペットの下に記載されている SDK リファレンスへのリンクを使用してください。完全な Python Lambda 関数も使用可能なチュートリアルについては、AWS クラウド へのデータストリームエクスポート (コンソール) または AWS クラウド へのデータストリームエクスポート (CLI) を参照してください。

Lambda 関数では、関数ハンドラの外部で StreamManagerClient をインスタンス化する必要があります。ハンドラでインスタンス化されると、関数は呼び出されるたびに client およびストリームマネージャへの接続を作成します。

注記

ハンドラで StreamManagerClient のインスタンス化を行う場合は、client が作業を完了したときに、close() メソッドを明示的に呼び出す必要があります。それ以外の場合、client は接続を開いたままにし、スクリプトが終了するまで別のスレッドを実行します。

StreamManagerClient では次の操作がサポートされています。

メッセージストリームの作成

ストリームを作成するには、ユーザー定義の Lambda 関数で create メソッドを呼び出し、MessageStreamDefinition オブジェクトを渡します。このオブジェクトによって、ストリームの一意の名前を指定すると共に、最大ストリームサイズに達したときにストリームマネージャーが新しいデータを処理する方法を定義します。MessageStreamDefinition とそのデータ型 (ExportDefinitionStrategyOnFullPersistence など) を使用して、他のストリームプロパティを定義できます。具体的には次のとおりです。

  • 自動エクスポートのターゲットAWS IoT Analytics、Kinesis Data Streams、AWS IoT SiteWise、Amazon S3 送信先。詳細については、「AWS クラウド でサポートされている送信先のエクスポート設定」を参照してください。

  • エクスポートの優先度。ストリームマネージャーは、プライオリティの低いストリームよりも先にプライオリティの高いストリームをエクスポートします。

  • AWS IoT Analytics、Kinesis Data Streams、AWS IoT SiteWise 送信先の最大バッチサイズとバッチ間隔。ストリームマネージャーは、いずれかの条件が満たされたときにメッセージをエクスポートします。

  • 有効期限 (TTL) ストリームデータが処理可能であることを保証する時間。この期間内にデータを消費できることを確認する必要があります。これは削除ポリシーではありません。TTL 期間の直後にデータが削除されない場合があります。

  • ストリームの永続性。ストリームをファイルシステムに保存して、コアを再起動してもデータを保持するか、ストリームをメモリに保存するかを選択します。

  • 開始シーケンス番号。エクスポートで開始メッセージとして使用するメッセージのシーケンス番号を指定します。

MessageStreamDefinition の詳細については、ターゲット言語の SDK リファレンスを参照してください。

注記

StreamManagerClient を利用すると、ターゲットの送信先を使用して、ストリームを HTTP サーバーにエクスポートできます。このターゲットは、テストのみを目的としています。また、安定しておらず、実稼働環境での使用はサポートされていません。

ストリームが作成されると、Lambda 関数はストリームにメッセージを追加してエクスポート用データを送信します。また、ローカル処理用にストリームからメッセージを読み取ります。作成するストリームの数は、ハードウェアの機能とビジネスケースによって異なります。対策を 1 つ挙げるとすれば、それは、AWS IoT Analytics または Kinesis データストリームでターゲットチャネルごとにストリームを作成することです (ただし、1 つのストリームに複数のターゲットを定義できます)。ストリームは寿命に耐久性があります。

要件

この操作には以下の要件があります。

  • AWS IoT Greengrass Core の最小バージョン: 1.10.0

  • AWS IoT Greengrass Core SDK の最小バージョン: Python: 1.5.0  |  Java: 1.4.0  |  Node.js: 1.6.0

注記

AWS IoT SiteWise または Amazon S3 をエクスポート先とするストリームを作成する場合、次の要件があります。

  • AWS IoT Greengrass Core の最小バージョン: 1.11.0

  • AWS IoT Greengrass Core SDK の最小バージョン: 1.6.0  |  Java: 1.5.0  |  Node.js: 1.7.0

次のスニペットでは、StreamName という名前のストリームが作成されます。これにより、MessageStreamDefinition と下位のデータ型でストリームプロパティを定義します。

Python
client = StreamManagerClient() try: client.create_message_stream(MessageStreamDefinition( name="StreamName", # Required. max_size=268435456, # Default is 256 MB. stream_segment_size=16777216, # Default is 16 MB. time_to_live_millis=None, # By default, no TTL is enabled. strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required. persistence=Persistence.File, # Default is File. flush_on_write=False, # Default is false. export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the AWS クラウド. kinesis=None, iot_analytics=None, iot_sitewise=None, s3_task_executor=None ) )) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK リファレンス: create_message_stream | MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456L) // Default is 256 MB. .withStreamSegmentSize(16777216L) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS クラウド. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSitewise(null) .withS3TaskExecutor(null) ) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK リファレンス: createMessageStream | MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456) // Default is 256 MB. .withStreamSegmentSize(16777216) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS クラウド. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSitewise(null) .withS3TaskExecutor(null) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK リファレンス: createMessageStream | MessageStreamDefinition

エクスポート先設定の詳細については、「AWS クラウド でサポートされている送信先のエクスポート設定」を参照してください。

 

メッセージの追加

ストリームマネージャーにエクスポート用データを送信するには、Lambda 関数でそのデータをターゲットストリームに追加します。このエクスポート先によって、このメソッドに渡すデータ型が決まります。

要件

この操作には以下の要件があります。

  • AWS IoT Greengrass Core の最小バージョン: 1.10.0

  • AWS IoT Greengrass Core SDK の最小バージョン: Python: 1.5.0  |  Java: 1.4.0  |  Node.js: 1.6.0

注記

AWS IoT SiteWise または Amazon S3 をエクスポート先としてメッセージを追加する場合、次の要件があります。

  • AWS IoT Greengrass Core の最小バージョン: 1.11.0

  • AWS IoT Greengrass Core SDK の最小バージョン: 1.6.0  |  Java: 1.5.0  |  Node.js: 1.7.0

AWS IoT Analytics または Kinesis Data Streams をエクスポート先とする

次のスニペットは、StreamName という名前のストリームにメッセージを追加します。AWS IoT Analytics または Kinesis Data Streams を送信先とする場合、Lambda 関数はデータの BLOB を追加します。

このスニペットには以下の要件があります。

  • AWS IoT Greengrass Core の最小バージョン: 1.10.0

  • AWS IoT Greengrass Core SDK の最小バージョン: Python: 1.5.0  |  Java: 1.4.0  |  Node.js: 1.6.0

Python
client = StreamManagerClient() try: sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data') except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK リファレンス: append_message

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes()); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK リファレンス: appendMessage

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array")); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK リファレンス: appendMessage

AWS IoT SiteWise をエクスポート先とする

次のスニペットは、StreamName という名前のストリームにメッセージを追加します。AWS IoT SiteWise を送信先とする場合、Lambda 関数はシリアル化された PutAssetPropertyValueEntry オブジェクトを追加します。詳細については、「AWS IoT SiteWise にエクスポートする」を参照してください。

注記

AWS IoT SiteWise にデータを送信する場合、データは BatchPutAssetPropertyValue アクションの要件を満たしている必要があります。詳は、AWS IoT SiteWise API リファレンスBatchPutAssetPropertyValue を参照してください。

このスニペットには以下の要件があります。

  • AWS IoT Greengrass Core の最小バージョン: 1.11.0

  • AWS IoT Greengrass Core SDK の最小バージョン: 1.6.0  |  Java: 1.5.0  |  Node.js: 1.7.0

Python
client = StreamManagerClient() try: # SiteWise requires unique timestamps in all messages. Add some randomness to time and offset. # Note: To create a new asset property data, you should use the classes defined in the # greengrasssdk.stream_manager module. time_in_nanos = TimeInNanos( time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) ) variant = Variant(double_value=random.random()) asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset) sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK リファレンス: append_message | PutAssetPropertyValueEntry

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { Random rand = new Random(); // Note: To create a new asset property data, you should use the classes defined in the // com.amazonaws.greengrass.streammanager.model.sitewise package. List<AssetPropertyValue> entries = new ArrayList<>() ; // IoTSiteWise requires unique timestamps in all messages. Add some randomness to time and offset. final int maxTimeRandomness = 60; final int maxOffsetRandomness = 10000; double randomValue = rand.nextDouble(); TimeInNanos timestamp = new TimeInNanos() .withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness)) .withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness))); AssetPropertyValue entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); entries.add(entry); PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(UUID.randomUUID().toString()) .withPropertyAlias("PropertyAlias") .withPropertyValues(entries); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK リファレンス: appendMessage | PutAssetPropertyValueEntry

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const maxTimeRandomness = 60; const maxOffsetRandomness = 10000; const randomValue = Math.random(); // Note: To create a new asset property data, you should use the classes defined in the // aws-greengrass-core-sdk StreamManager module. const timestamp = new TimeInNanos() .withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness)) .withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness)); const entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(`${ENTRY_ID_PREFIX}${i}`) .withPropertyAlias("PropertyAlias") .withPropertyValues([entry]); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK リファレンス: appendMessage | PutAssetPropertyValueEntry

Amazon S3 をエクスポート先とする

次のスニペットは、StreamName という名前のストリームにエクスポートタスクを追加します。Amazon S3 を送信先とする場合、Lambda 関数は、シリアル化された S3ExportTaskDefinition オブジェクトを追加します。これには、ソース入力ファイルとターゲット Amazon S3 オブジェクトに関する情報が含まれています。指定されたオブジェクトが存在しない場合、ストリームマネージャーがそのオブジェクトを作成します。詳細については、「Amazon S3 へのエクスポート」を参照してください。

このスニペットには以下の要件があります。

  • AWS IoT Greengrass Core の最小バージョン: 1.11.0

  • AWS IoT Greengrass Core SDK の最小バージョン: 1.6.0  |  Java: 1.5.0  |  Node.js: 1.7.0

Python
client = StreamManagerClient() try: # Append an Amazon S3 Task definition and print the sequence number. s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName") sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK リファレンス: append_message | S3ExportTaskDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { // Append an Amazon S3 export task definition and print the sequence number. S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition)); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK リファレンス: appendMessage | S3ExportTaskDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { // Append an Amazon S3 export task definition and print the sequence number. const taskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition))); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK リファレンス: appendMessage | S3ExportTaskDefinition

 

メッセージの読み取り

ストリームからメッセージを読み取ります。

要件

この操作には以下の要件があります。

  • AWS IoT Greengrass Core の最小バージョン: 1.10.0

  • AWS IoT Greengrass Core SDK の最小バージョン: Python: 1.5.0  |  Java: 1.4.0  |  Node.js: 1.6.0

次のスニペットは、StreamName という名前のストリームからメッセージを読み取ります。読み取りメソッドは、読み取りを開始するシーケンス番号、読み取る最小値と最大値、メッセージを読み取るためのタイムアウトを指定するオプションの ReadMessagesOptions オブジェクトを受け取ります。

Python
client = StreamManagerClient() try: message_list = client.read_messages( stream_name="StreamName", # By default, if no options are specified, it tries to read one message from the beginning of the stream. options=ReadMessagesOptions( desired_start_sequence_number=100, # Try to read from sequence number 100 or greater. By default, this is 0. min_message_count=10, # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. max_message_count=100, # Accept up to 100 messages. By default this is 1. read_timeout_millis=5000 # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. ) ) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK リファレンス: read_messages | ReadMessagesOptions

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<Message> messages = client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100L) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. .withMinMessageCount(10L) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100L) // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis()) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK リファレンス: readMessages | ReadMessagesOptions

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messages = await client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1. .withMinMessageCount(10) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100) // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(5 * 1000) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK リファレンス: readMessages | ReadMessagesOptions

 

ストリームのリスト表示

ストリームマネージャーでストリームのリストを取得します。

要件

この操作には以下の要件があります。

  • AWS IoT Greengrass Core の最小バージョン: 1.10.0

  • AWS IoT Greengrass Core SDK の最小バージョン: Python: 1.5.0  |  Java: 1.4.0  |  Node.js: 1.6.0

次のスニペットは、ストリームマネージャーのストリームのリストを (名前で) 取得します。

Python
client = StreamManagerClient() try: stream_names = client.list_streams() except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK リファレンス: list_streams

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<String> streamNames = client.listStreams(); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK リファレンス: listStreams

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const streams = await client.listStreams(); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK リファレンス: listStreams

 

メッセージストリームの説明

ストリームの定義、サイズ、エクスポートステータスなど、ストリームに関するメタデータを取得します。

要件

この操作には以下の要件があります。

  • AWS IoT Greengrass Core の最小バージョン: 1.10.0

  • AWS IoT Greengrass Core SDK の最小バージョン: Python: 1.5.0  |  Java: 1.4.0  |  Node.js: 1.6.0

次のスニペットは、ストリームの定義、サイズ、エクスポーターのステータスなど、StreamName という名前のストリームに関するメタデータを取得します。

Python
client = StreamManagerClient() try: stream_description = client.describe_message_stream(stream_name="StreamName") if stream_description.export_statuses[0].error_message: # The last export of export destination 0 failed with some error # Here is the last sequence number that was successfully exported stream_description.export_statuses[0].last_exported_sequence_number if (stream_description.storage_status.newest_sequence_number > stream_description.export_statuses[0].last_exported_sequence_number): pass # The end of the stream is ahead of the last exported sequence number except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK リファレンス: describe_message_stream

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo description = client.describeMessageStream("StreamName"); String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage(); if (lastErrorMessage != null && !lastErrorMessage.equals("")) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.getExportStatuses().get(0).getLastExportedSequenceNumber(); } if (description.getStorageStatus().getNewestSequenceNumber() > description.getExportStatuses().get(0).getLastExportedSequenceNumber()) { // The end of the stream is ahead of the last exported sequence number. } } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK リファレンス: describeMessageStream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const description = await client.describeMessageStream("StreamName"); const lastErrorMessage = description.exportStatuses[0].errorMessage; if (lastErrorMessage) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.exportStatuses[0].lastExportedSequenceNumber; } if (description.storageStatus.newestSequenceNumber > description.exportStatuses[0].lastExportedSequenceNumber) { // The end of the stream is ahead of the last exported sequence number. } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK リファレンス: describeMessageStream

 

メッセージストリームの更新

既存ストリームのプロパティを更新します。ストリームを作成した後に要件が変更された場合、ストリームの更新が必要となる場合があります。例:

  • AWS クラウド 送信先の新しいエクスポート設定を追加します。

  • ストリームの最大サイズを大きくして、データのエクスポート方法や保持方法を変更します。例えば、ストリームサイズを strategy on full 設定と組み合わせると、データが削除または拒否されることがあります。

  • エクスポートを一時停止して再開します。例えば、エクスポートタスクが長時間実行されているため、アップロードするデータ量を抑える場合などです。

Lambda 関数は、次の高レベルプロセスに従ってストリームを更新します。

  1. ストリームを説明する情報を取得します。

  2. 対応する MessageStreamDefinition オブジェクトと下位オブジェクトのターゲットプロパティを更新します。

  3. 更新済みの MessageStreamDefinition を渡します。更新済みストリームの完全なオブジェクト定義を必ず含めてください。未定義のプロパティは既定値に戻ります。

    エクスポートで開始メッセージとして使用するメッセージのシーケンス番号を指定できます。

要件

この操作には以下の要件があります。

  • AWS IoT Greengrass Core の最小バージョン: 1.11.0

  • AWS IoT Greengrass Core SDK の最小バージョン: 1.6.0  |  Java: 1.5.0  |  Node.js: 1.7.0

次のスニペットは、StreamName という名前のストリームを更新します。また、Kinesis Data Streams にエクスポートするストリームの複数のプロパティを更新します。

Python
client = StreamManagerClient() try: message_stream_info = client.describe_message_stream(STREAM_NAME) message_stream_info.definition.max_size=536870912 message_stream_info.definition.stream_segment_size=33554432 message_stream_info.definition.time_to_live_millis=3600000 message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData message_stream_info.definition.persistence=Persistence.Memory message_stream_info.definition.flush_on_write=False message_stream_info.definition.export_definition.kinesis= [KinesisConfig( # Updating Export definition to add a Kinesis Stream configuration. identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))] client.update_message_stream(message_stream_info.definition) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK リファレンス: updateMessageStream | MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME); // Update the message stream with new values. client.updateMessageStream( messageStreamInfo.getDefinition() .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912L) // Update Max Size to 512 MB. .withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB. .withFlushOnWrite(true) // Update flush on write to true. .withPersistence(Persistence.Memory) // Update the persistence to Memory. .withTimeToLiveMillis(3600000L) // Update TTL to 1 hour. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS クラウド. messageStreamInfo.getDefinition().getExportDefinition(). // Updating Export definition to add a Kinesis Stream configuration. .withKinesis(new ArrayList<KinesisConfig>() {{ add(new KinesisConfig() .withIdentifier(EXPORT_IDENTIFIER) .withKinesisStreamName("test")); }}) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK リファレンス: update_message_stream | MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messageStreamInfo = await c.describeMessageStream(STREAM_NAME); await client.updateMessageStream( messageStreamInfo.definition // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB. .withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB. .withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour. .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. .withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory .withFlushOnWrite(true) // Default is false. Updating to true. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS クラウド. messageStreamInfo.definition.exportDefinition // Updating Export definition to add a Kinesis Stream configuration. .withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())]) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK リファレンス: updateMessageStream | MessageStreamDefinition

ストリーム更新に対する制約

ストリームを更新する場合、以下の制約が適用されます。次のリストに記載のない限り、更新は直ちに有効になります。

  • ストリームの永続性を更新することはできません。この動作を変更するには、ストリームを削除し、新しい永続性ポリシーを定義するストリームを作成します。

  • ストリームの最大サイズは、次の条件を満たしている場合にのみ更新できます。

    • 最大サイズは、現在のストリームサイズ以上である必要があります。この情報を見つけるには、ストリームを記述して、返された MessageStreamInfo オブジェクトのストレージステータスを確認します。

    • 最大サイズは、ストリームのセグメントサイズ以上である必要があります。

  • ストリームセグメントサイズは、ストリームの最大サイズよりも小さい値に更新できます。更新した設定は、新しいセグメントに適用されます。

  • 有効期限 (TTL) プロパティの更新は、新しい追加操作に適用されます。この値を小さくすると、ストリームマネージャーが TTL を超える既存のセグメントを削除することもあります。

  • strategy on full プロパティの更新は、新しい追加操作に適用されます。最も古いデータを上書きするようにその対策を設定すると、ストリームマネージャーが新しい設定に基づいて既存のセグメントを上書きすることがあります。

  • flush on write プロパティの更新は、新しいメッセージに適用されます。

  • エクスポート設定の更新は、新しいエクスポートに適用されます。更新リクエストには、サポートするすべてのエクスポート設定を含める必要があります。含めない場合、ストリームマネージャーはそれらを削除します。

    • エクスポート設定の更新時には、ターゲットエクスポート設定の識別子を指定します。

    • エクスポート設定を追加するには、新しいエクスポート設定の一意の識別子を指定します。

    • エクスポート設定を削除するには、エクスポート設定を省略します。

  • ストリーム内のエクスポート設定の開始シーケンス番号を更新するには、最後のシーケンス番号よりも小さい値を指定する必要があります。この情報を見つけるには、ストリームを記述して、返された MessageStreamInfo オブジェクトのストレージステータスを確認します。

 

メッセージストリームの削除

ストリームを削除します。ストリームを削除すると、ストリームに保存されているすべてのデータがディスクから削除されます。

要件

この操作には以下の要件があります。

  • AWS IoT Greengrass Core の最小バージョン: 1.10.0

  • AWS IoT Greengrass Core SDK の最小バージョン: Python: 1.5.0  |  Java: 1.4.0  |  Node.js: 1.6.0

次のスニペットは、StreamName という名前のストリームを削除します。

Python
client = StreamManagerClient() try: client.delete_message_stream(stream_name="StreamName") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK リファレンス: deleteMessageStream

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.deleteMessageStream("StreamName"); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK リファレンス: delete_message_stream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.deleteMessageStream("StreamName"); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK リファレンス: deleteMessageStream

以下も参照してください。