AWS クラウド でサポートされている送信先のエクスポート設定 - AWS IoT Greengrass

AWS IoT Greengrass Version 1 は、2023 年 6 月 30 日に延長ライフフェーズに入りました。詳細については、「AWS IoT Greengrass V1 メンテナンスポリシー」を参照してください。この日以降、 AWS IoT Greengrass V1 は機能、機能強化、バグ修正、またはセキュリティパッチを提供する更新をリリースしません。で実行されるデバイスは中断 AWS IoT Greengrass V1 されず、引き続き運用され、クラウドに接続されます。への移行 AWS IoT Greengrass Version 2を強くお勧めします。これにより、重要な新機能が追加され、追加のプラットフォーム がサポートされます

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

AWS クラウド でサポートされている送信先のエクスポート設定

ユーザー定義の Lambda 関数は、StreamManagerClient Core SDK の AWS IoT Greengrass を使用して、ストリームマネージャーとやり取りを行います。Lambda 関数は、ストリームの作成またはストリームの更新を行う際に、MessageStreamDefinition オブジェクト (エクスポート定義などのストリームプロパティ) を渡します。ExportDefinition オブジェクトには、そのストリームに定義されたエクスポート設定が含まれています。ストリームマネージャーは、これらのエクスポート設定を使用して、ストリームをエクスポートする場所と方法を決定します。

ExportDefinition プロパティタイプのオブジェクトモデル図。

1 つのストリームに 0 または 1 つ以上のエクスポートを定義できます。この場合、1 つの送信先タイプに複数のエクスポートを定義することも可能です。例えば、ストリームを 2 つの AWS IoT Analytics チャネルと 1 つの Kinesis データストリームにエクスポートできます。

エクスポートに失敗した場合、ストリームマネージャーは最大 5 分間隔で AWS クラウド へのデータのエクスポートを継続的に再試行します。再試行回数に上限はありません。

注記

StreamManagerClient を利用すると、ターゲットの送信先を使用して、ストリームを HTTP サーバーにエクスポートできます。このターゲットは、テストのみを目的としています。また、安定しておらず、実稼働環境での使用はサポートされていません。

こうした AWS クラウド リソースはユーザー側で維持する必要があります。

AWS IoT Analytics チャネル

ストリームマネージャーは、AWS IoT Analytics への自動エクスポートをサポートしています。AWS IoT Analytics による高度なデータ分析が、ビジネス上の意思決定や、機械学習モデルの改善に役立ちます。詳細については、「AWS IoT Analytics ユーザーガイド」の「AWS IoT Analytics とは?」を参照してください。

AWS IoT Greengrass Core SDK では、Lambda 関数によって IoTAnalyticsConfig を使用し、この送信先タイプのエクスポート設定を定義します。詳細については、ターゲット言語の SDK リファレンスを参照してください。

要件

このエクスポート先には以下の要件があります。

  • AWS IoT Analytics のターゲットチャネルは、Greengrass グループと同じ AWS アカウント と AWS リージョン にある必要があります。

  • Greengrass グループのロール では、ターゲットチャネルに対する iotanalytics:BatchPutMessage 権限が許可されている必要があります。例:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iotanalytics:BatchPutMessage" ], "Resource": [ "arn:aws:iotanalytics:region:account-id:channel/channel_1_name", "arn:aws:iotanalytics:region:account-id:channel/channel_2_name" ] } ] }

    リソースにきめ細かいアクセス権限または条件付きアクセス権限を付与できます (例えば、ワイルドカード * 命名スキームを使用)。詳細については、「IAM ユーザーガイド」の「IAM ポリシーの追加と削除」を参照してください。

AWS IoT Analytics にエクスポートする

AWS IoT Analytics にエクスポートするストリームを作成するには、Lambda 関数で 1 つ以上の IoTAnalyticsConfig オブジェクトを含むエクスポート定義を使用してストリームを作成します。このオブジェクトによって、ターゲットチャネル、バッチサイズ、バッチ間隔、優先度などのエクスポート設定を定義します。

Lambda 関数は、デバイスからデータを受信した際に、データの BLOB を含むメッセージをターゲットストリームに追加します。

その後、ストリームマネージャーは、ストリームのエクスポート設定で定義されたバッチ設定と優先度に基づいてデータをエクスポートします。

 

Amazon Kinesis Data Streams

ストリームマネージャーは、Amazon Kinesis Data Streams への自動エクスポートをサポートしています。Kinesis Data Streams は、一般的に、大量のデータを集約して、データウェアハウスまたは MapReduce クラスターに読み込むために使用されます。詳細については、「Amazon Kinesis デベロッパーガイド」の「Amazon Kinesis Data Streams とは」を参照してください。

AWS IoT Greengrass Core SDK では、Lambda 関数によって KinesisConfig を使用し、この送信先タイプのエクスポート設定を定義します。詳細については、ターゲット言語の SDK リファレンスを参照してください。

要件

このエクスポート先には以下の要件があります。

  • Kinesis Data Streams のターゲットストリームは、Greengrass グループと同じ AWS アカウント と AWS リージョン にある必要があります。

  • Greengrass グループのロール では、ターゲットデータストリームに対する kinesis:PutRecords 権限が許可されている必要があります。例:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:region:account-id:stream/stream_1_name", "arn:aws:kinesis:region:account-id:stream/stream_2_name" ] } ] }

    リソースにきめ細かいアクセス権限または条件付きアクセス権限を付与できます (例えば、ワイルドカード * 命名スキームを使用)。詳細については、「IAM ユーザーガイド」の「IAM ポリシーの追加と削除」を参照してください。

Kinesis Data Streams へのエクスポート

Kinesis Data Streams にエクスポートするストリームを作成するには、Lambda 関数で 1 つ以上の KinesisConfig オブジェクトを含むエクスポート定義を使用してストリームを作成します。このオブジェクトによって、ターゲットデータストリーム、バッチサイズ、バッチ間隔、優先度などのエクスポート設定を定義します。

Lambda 関数は、デバイスからデータを受信した際に、データの BLOB を含むメッセージをターゲットストリームに追加します。その後、ストリームマネージャーは、ストリームのエクスポート設定で定義されたバッチ設定と優先度に基づいてデータをエクスポートします。

ストリームマネージャーは、Amazon Kinesis にアップロードされた各レコードのパーティションキーとして、一意のランダムな UUID を生成します。

 

AWS IoT SiteWise アセットプロパティ

ストリームマネージャーは、AWS IoT SiteWise への自動エクスポートをサポートしています。AWS IoT SiteWiseを使用すると、産業機器からのデータを大規模に収集、整理、分析できます。詳細については、「AWS IoT SiteWise ユーザーガイド」の「AWS IoT SiteWise とは?」を参照してください。

AWS IoT Greengrass Core SDK では、Lambda 関数によって IoTSiteWiseConfig を使用し、この送信先タイプのエクスポート設定を定義します。詳細については、ターゲット言語の SDK リファレンスを参照してください。

注記

AWS では、OPC-UA ソースと共に使用可能な事前構築済みソリューションである IoT SiteWise コネクタ も提供されています。

要件

このエクスポート先には以下の要件があります。

  • AWS IoT SiteWise のターゲットアセットプロパティは、Greengrass グループと同じ AWS アカウント と AWS リージョン にある必要があります。

    注記

    AWS IoT SiteWise がサポートするリージョンのリストについては、「AWS 全般のリファレンス」の「AWS IoT SiteWise エンドポイントとクォータ」を参照してください。

  • Greengrass グループのロール では、アセットプロパティに対する iotsitewise:BatchPutAssetPropertyValue 権限が許可されている必要があります。次の例では、ポリシーで iotsitewise:assetHierarchyPath 条件キーを使用して、ターゲットルートアセットとその子へのアクセスを許可しています。ポリシーから Condition を削除して、すべての AWS IoT SiteWise アセットへのアクセスを許可したり、個々のアセットの ARN を指定したりできます。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "iotsitewise:BatchPutAssetPropertyValue", "Resource": "*", "Condition": { "StringLike": { "iotsitewise:assetHierarchyPath": [ "/root node asset ID", "/root node asset ID/*" ] } } } ] }

    リソースにきめ細かいアクセス権限または条件付きアクセス権限を付与できます (例えば、ワイルドカード * 命名スキームを使用)。詳細については、「IAM ユーザーガイド」の「IAM ポリシーの追加と削除」を参照してください。

    重要なセキュリティ情報については、「AWS IoT SiteWise ユーザーガイド」の「BatchPutAssetPropertyValue 認証」を参照してください。

AWS IoT SiteWise にエクスポートする

AWS IoT SiteWise にエクスポートするストリームを作成するには、Lambda 関数で 1 つ以上の IoTSiteWiseConfig オブジェクトを含むエクスポート定義を使用してストリームを作成します。このオブジェクトによって、バッチサイズ、バッチ間隔、優先度などのエクスポート設定を定義します。

Lambda 関数は、デバイスからアセットプロパティデータを受信した際に、そのデータを含むメッセージをターゲットストリームに追加します。メッセージは、JSON シリアル化された PutAssetPropertyValueEntry オブジェクトであり、これには、1 つ以上のアセットプロパティに対するプロパティ値が含まれています。詳細については、AWS IoT SiteWise のエクスポート先に関する「メッセージの追加」を参照してください。

注記

AWS IoT SiteWise にデータを送信する場合、データは BatchPutAssetPropertyValue アクションの要件を満たしている必要があります。詳は、AWS IoT SiteWise API リファレンスBatchPutAssetPropertyValue を参照してください。

その後、ストリームマネージャーは、ストリームのエクスポート設定で定義されたバッチ設定と優先度に基づいてデータをエクスポートします。

 

ストリームマネージャーの設定と Lambda 関数ロジックを調整して、エクスポート対策を設計できます。例:

  • ほぼリアルタイムのエクスポートでは、少ないバッチサイズと短い間隔を設定して、受信したデータをストリームに追加します。

  • バッチの最適化、帯域幅の制約軽減、コスト最小化を行うには、Lambda 関数を使用して、受信した、単一アセットプロパティのタイムスタンプ品質値 (TQV) データポイントをプールし、その後、データをストリームに追加します。対策を 1 つ挙げるとすれば、それは、同じプロパティのエントリを複数送信するのではなく、最大 10 の異なるプロパティとアセットの組み合わせ、またはプロパティエイリアスのエントリを 1 つのメッセージでバッチ処理することです。これにより、ストリームマネージャーは AWS IoT SiteWise クォータ内にとどまることができます。

 

Amazon S3 オブジェクト

ストリームマネージャーは、Amazon S3 への自動エクスポートをサポートしています。Amazon S3 を使用すると、膨大なデータの保存と取得を行えます。詳細については、「Amazon Simple Storage Service デベロッパーガイド」の「Amazon S3 とは」を参照してください。

AWS IoT Greengrass Core SDK では、Lambda 関数によって S3ExportTaskExecutorConfig を使用し、この送信先タイプのエクスポート設定を定義します。詳細については、ターゲット言語の SDK リファレンスを参照してください。

要件

このエクスポート先には以下の要件があります。

  • ターゲット Amazon S3 バケットは Greengrass グループと同じ AWS アカウント にある必要があります。

  • Greengrass グループのデフォルトのコンテナ化Greengrass コンテナの場合に、/tmp 下にある入力ファイルディレクトリ、またはルートファイルシステム上にない入力ファイルディレクトリを使用するには、STREAM_MANAGER_READ_ONLY_DIRS パラメータを設定する必要があります。

  • Greengrass コンテナモードで実行している Lambda 関数によって入力ファイルを入力ファイルディレクトリに書き込む場合は、そのディレクトリ用にローカルボリュームリソースを作成し、書き込み権限でディレクトリをコンテナにマウントする必要があります。これにより、ファイルがルートファイルシステムに書き込まれ、コンテナ外に表示されるようになります。詳細については、「Lambda 関数とコネクタを使用してローカルリソースにアクセスする」を参照してください。

  • Greengrass グループのロール では、ターゲットバケットに対する次の権限が許可されている必要があります。例:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts" ], "Resource": [ "arn:aws:s3:::bucket-1-name/*", "arn:aws:s3:::bucket-2-name/*" ] } ] }

    リソースにきめ細かいアクセス権限または条件付きアクセス権限を付与できます (例えば、ワイルドカード * 命名スキームを使用)。詳細については、「IAM ユーザーガイド」の「IAM ポリシーの追加と削除」を参照してください。

Amazon S3 へのエクスポート

Amazon S3 にエクスポートするストリームを作成するには、Lambda 関数で S3ExportTaskExecutorConfig オブジェクトを使用してエクスポートポリシーを設定します。このポリシーによって、マルチパートアップロードのしきい値や優先度といったエクスポート設定を定義します。Amazon S3 エクスポートでは、ストリームマネージャーが、コアデバイス上のローカルファイルから読み取るデータをアップロードします。アップロードを開始するには、Lambda 関数でエクスポートタスクをターゲットストリームに追加します。エクスポートタスクには、入力ファイルとターゲット Amazon S3 オブジェクトの情報が含まれます。ストリームマネージャーは、ストリームに追加された順にタスクを実行します。

注記

ターゲットバケットは、AWS アカウント に存在しなければなりません。指定したキーのオブジェクトが存在しない場合は、ストリームマネージャーによってオブジェクトが作成されます。

以下の図に、この高レベルのワークフローを示します。

Amazon S3 エクスポートのストリームマネージャーワークフローを示す図。

ストリームマネージャーは、マルチパートアップロードしきい値プロパティ、最小パーツサイズ設定、入力ファイルサイズを使用して、データのアップロード方法を決定します。マルチパートアップロードのしきい値は、最小パートサイズ以上でなければなりません。データを並行してアップロードする場合は、複数のストリームを作成できます。

ターゲット Amazon S3 オブジェクトを指定するキーには、有効な Java DateTimeFormatter 文字列を !{timestamp:value} プレースホルダーに含めることができます。これらのタイムスタンププレースホルダーを使用すると、入力ファイルデータがアップロードされた時刻に基づいて Amazon S3 のデータを分割できます。例えば、次のキー名は、my-key/2020/12/31/data.txt などの値に解決されます。

my-key/!{timestamp:YYYY}/!{timestamp:MM}/!{timestamp:dd}/data.txt
注記

ストリームのエクスポートステータスを監視する場合は、まずステータスストリームを作成して、そのストリームを使用するようにエクスポートストリームを設定します。詳細については、「エクスポートタスクの監視」を参照してください。

入力データの管理

IoT アプリケーションが入力データのライフサイクル管理に使用するコードを作成できます。次のワークフロー例は、Lambda 関数を使用してこのデータを管理する方法を示しています。

  1. ローカルプロセスは、デバイスまたは周辺機器からデータを受信し、コアデバイスのディレクトリ内にあるファイルにデータを書き込みます。これらが、ストリームマネージャーの入力ファイルとなります。

    注記

    入力ファイルディレクトリへのアクセスを設定する必要があるかどうかを判断するには、STREAM_MANAGER_READ_ONLY_DIRS パラメータを参照してください。

    ストリームマネージャーが実行されるプロセスは、グループのデフォルトアクセス ID のファイルシステム権限をすべて継承します。ストリームマネージャーには、入力ファイルへのアクセス許可が必要です。必要に応じて、chmod(1) コマンドを使用して、ファイルへのアクセス許可を変更できます。

  2. Lambda 関数はディレクトリをスキャンし、新しいファイルが作成されると、ターゲットストリームにエクスポートタスクを追加します。このタスクは、JSON シリアル化された S3ExportTaskDefinition オブジェクトであり、これによって、入力ファイルの URL、ターゲットの Amazon S3 バケットとキー、オプションのユーザーメタデータが指定されます。

  3. ストリームマネージャーは、入力ファイルを読み取り、追加されたタスクの順に Amazon S3 にデータをエクスポートします。ターゲットバケットは、AWS アカウント に存在しなければなりません。指定したキーのオブジェクトが存在しない場合は、ストリームマネージャーによってオブジェクトが作成されます。

  4. Lambda 関数は、ステータスストリームからメッセージを読み取り、エクスポートステータスを監視します。エクスポートタスクが完了すると、Lambda 関数は対応する入力ファイルを削除します。詳細については、「エクスポートタスクの監視」を参照してください。

エクスポートタスクの監視

IoT アプリケーションが Amazon S3 エクスポートのステータス監視に使用するコードを作成できます。Lambda 関数は、ステータスストリームを作成して、そのストリームにステータス更新を書き込むようにエクスポートストリームを設定する必要があります。1 つのステータスストリームは、Amazon S3 にエクスポートする複数のストリームからステータスの更新を受け取ることができます。

まず、ステータスストリームとして使用するストリームを作成します。ストリームのサイズとリテンションポリシーを設定して、ステータスメッセージのライフスパンを制御できます。例:

  • ステータスメッセージを保存しない場合は、PersistenceMemory に設定します。

  • 新しいステータスメッセージが失われないようにするには、StrategyOnFullOverwriteOldestData に設定します。

次に、ステータスストリームを使用するようにエクスポートストリームを作成または更新します。具体的には、ストリームの S3ExportTaskExecutorConfig エクスポート設定のステータス構成プロパティを設定します。これにより、エクスポートタスクに関するステータスメッセージをステータスストリームに書き込むようにストリームマネージャーに指示します。StatusConfig オブジェクトで、ステータスストリームの名前と冗長性のレベルを指定します。サポート対象の値を次に示します。最も冗長でないもの (ERROR) から最も冗長なもの (TRACE) を表しています。デフォルト: INFO

  • ERROR

  • WARN

  • INFO

  • DEBUG

  • TRACE

 

次のワークフロー例は、Lambda 関数がステータスストリームを使用してエクスポートステータスを監視する方法を示しています。

  1. 前のワークフローで説明したように、Lambda 関数は、エクスポートタスクに関するステータスメッセージをステータスストリームに書き込むように設定されたストリームにエクスポートタスクを追加します。この追加の操作によって、タスク ID を表すシーケンス番号が返ります。

  2. Lambda 関数は、ステータスストリームからメッセージを順番に読み取ります。その後、ストリーム名とタスク ID に基づいて、またはメッセージコンテキストからのエクスポートタスクプロパティに基づいてメッセージをフィルタリングします。例えば、Lambda 関数は、エクスポートタスクの入力ファイル URL でフィルタリングできます。このタスクは、メッセージコンテキストの S3ExportTaskDefinition オブジェクトで表されます。

    次のステータスコードは、エクスポートタスクが完了の状態になったことを示します。

    • Success。アップロードは正常に完了しました。

    • Failure。ストリームマネージャーでエラー (例: 指定したバケットが存在しないなど) が発生しました。問題の解決後に、エクスポートタスクをストリームに再度追加できます。

    • Canceled。ストリームまたはエクスポートの定義が削除された、もしくはタスクの存続期間 (TTL) の有効期限が切れたため、タスクは中止されました。

    注記

    タスクのステータスは InProgress または Warning の場合もあります。ストリームマネージャーは、タスクの実行に影響しないエラーがイベントから返ったときに警告を発行します。例えば、中断された部分的アップロードのクリーンアップが失敗すると、警告を返します。

  3. エクスポートタスクが完了すると、Lambda 関数は対応する入力ファイルを削除します。

次の例は、Lambda 関数がステータスメッセージを読み取り、処理する方法を示しています。

Python
import time from greengrasssdk.stream_manager import ( ReadMessagesOptions, Status, StatusConfig, StatusLevel, StatusMessage, StreamManagerClient, ) from greengrasssdk.stream_manager.util import Util client = StreamManagerClient() try: # Read the statuses from the export status stream is_file_uploaded_to_s3 = False while not is_file_uploaded_to_s3: try: messages_list = client.read_messages( "StatusStreamName", ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000) ) for message in messages_list: # Deserialize the status message first. status_message = Util.deserialize_json_bytes_to_obj(message.payload, StatusMessage) # Check the status of the status message. If the status is "Success", # the file was successfully uploaded to S3. # If the status was either "Failure" or "Cancelled", the server was unable to upload the file to S3. # We will print the message for why the upload to S3 failed from the status message. # If the status was "InProgress", the status indicates that the server has started uploading # the S3 task. if status_message.status == Status.Success: logger.info("Successfully uploaded file at path " + file_url + " to S3.") is_file_uploaded_to_s3 = True elif status_message.status == Status.Failure or status_message.status == Status.Canceled: logger.info( "Unable to upload file at path " + file_url + " to S3. Message: " + status_message.message ) is_file_uploaded_to_s3 = True time.sleep(5) except StreamManagerException: logger.exception("Exception while running") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK リファレンス: read_messages | StatusMessage

Java
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient; import com.amazonaws.greengrass.streammanager.client.utils.ValidateAndSerialize; import com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions; import com.amazonaws.greengrass.streammanager.model.Status; import com.amazonaws.greengrass.streammanager.model.StatusConfig; import com.amazonaws.greengrass.streammanager.model.StatusLevel; import com.amazonaws.greengrass.streammanager.model.StatusMessage; try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { try { boolean isS3UploadComplete = false; while (!isS3UploadComplete) { try { // Read the statuses from the export status stream List<Message> messages = client.readMessages("StatusStreamName", new ReadMessagesOptions().withMinMessageCount(1L).withReadTimeoutMillis(1000L)); for (Message message : messages) { // Deserialize the status message first. StatusMessage statusMessage = ValidateAndSerialize.deserializeJsonBytesToObj(message.getPayload(), StatusMessage.class); // Check the status of the status message. If the status is "Success", the file was successfully uploaded to S3. // If the status was either "Failure" or "Canceled", the server was unable to upload the file to S3. // We will print the message for why the upload to S3 failed from the status message. // If the status was "InProgress", the status indicates that the server has started uploading the S3 task. if (Status.Success.equals(statusMessage.getStatus())) { System.out.println("Successfully uploaded file at path " + FILE_URL + " to S3."); isS3UploadComplete = true; } else if (Status.Failure.equals(statusMessage.getStatus()) || Status.Canceled.equals(statusMessage.getStatus())) { System.out.println(String.format("Unable to upload file at path %s to S3. Message %s", statusMessage.getStatusContext().getS3ExportTaskDefinition().getInputUrl(), statusMessage.getMessage())); sS3UploadComplete = true; } } } catch (StreamManagerException ignored) { } finally { // Sleep for sometime for the S3 upload task to complete before trying to read the status message. Thread.sleep(5000); } } catch (e) { // Properly handle errors. } } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK リファレンス: readMessages | StatusMessage

Node.js
const { StreamManagerClient, ReadMessagesOptions, Status, StatusConfig, StatusLevel, StatusMessage, util, } = require('aws-greengrass-core-sdk').StreamManager; const client = new StreamManagerClient(); client.onConnected(async () => { try { let isS3UploadComplete = false; while (!isS3UploadComplete) { try { // Read the statuses from the export status stream const messages = await c.readMessages("StatusStreamName", new ReadMessagesOptions() .withMinMessageCount(1) .withReadTimeoutMillis(1000)); messages.forEach((message) => { // Deserialize the status message first. const statusMessage = util.deserializeJsonBytesToObj(message.payload, StatusMessage); // Check the status of the status message. If the status is 'Success', the file was successfully uploaded to S3. // If the status was either 'Failure' or 'Cancelled', the server was unable to upload the file to S3. // We will print the message for why the upload to S3 failed from the status message. // If the status was "InProgress", the status indicates that the server has started uploading the S3 task. if (statusMessage.status === Status.Success) { console.log(`Successfully uploaded file at path ${FILE_URL} to S3.`); isS3UploadComplete = true; } else if (statusMessage.status === Status.Failure || statusMessage.status === Status.Canceled) { console.log(`Unable to upload file at path ${FILE_URL} to S3. Message: ${statusMessage.message}`); isS3UploadComplete = true; } }); // Sleep for sometime for the S3 upload task to complete before trying to read the status message. await new Promise((r) => setTimeout(r, 5000)); } catch (e) { // Ignored } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK リファレンス: readMessages | StatusMessage