スループットの増加に合わせた入力ストリームの並列処理 - Amazon Kinesis Data Analytics for SQL Applications デベロッパーガイド

慎重に検討した結果、2 つのステップでSQLアプリケーションの Amazon Kinesis Data Analytics を中止することにしました。

1. 2025 年 10 月 15 日以降、SQLアプリケーション用の新しい Kinesis Data Analytics を作成することはできません。

2. 2026 年 1 月 27 日以降、アプリケーションは削除されます。SQL アプリケーションの Amazon Kinesis Data Analytics を起動または操作することはできません。SQL それ以降、Amazon Kinesis Data Analytics のサポートは利用できなくなります。詳細については、「Amazon Kinesis Data Analytics for SQL Applications の停止」を参照してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

スループットの増加に合わせた入力ストリームの並列処理

注記

2023 年 9 月 12 日以降、Kinesis Data Analytics for SQL をまだ使用していない場合、Kinesis Data Firehose をソースとして使用して新しいアプリケーションを作成することはできません。詳細については、「制限」を参照してください。

Amazon Kinesis Data Analytics アプリケーションでは、アプリケーション内入力ストリームのスループットを超えるアプリケーションをスケーリングするために、複数のアプリケーション内入力ストリームをサポートできます。アプリケーション内入力ストリームの詳細については、「Amazon Kinesis Data Analytics for SQL Applications: 仕組み」を参照してください。

ほとんどの場合、Amazon Kinesis Data Analytics では、アプリケーションにフィードされる Kinesis ストリームまたは Firehose ソースストリームの容量を処理できるように、アプリケーションがスケールされます。ただし、ソースストリームのスループットが、単一のアプリケーション内入力ストリームのスループットを超える場合は、アプリケーションで使用されるアプリケーション内入力ストリームの数を明示的に増やすことができます。そのためには、InputParallelism パラメータを使用します。

InputParallelism パラメータが 1 以上の場合、Amazon Kinesis Data Analytics は、アプリケーション内ストリーム間のソースストリームのパーティションを均等に分割します。たとえば、ソースストリームに 50 シャードあり、InputParallelism2 に設定した場合、アプリケーション内入力ストリームはそれぞれ、25 のソースストリームのシャードから入力を受け取ります。

アプリケーション内ストリームの数を増やす場合は、アプリケーションから、各ストリームのデータに明示的にアクセスする必要があります。コードで複数のアプリケーション内ストリームにアクセスする方法については、「Amazon Kinesis Data Analytics アプリケーションでの別のアプリケーション内ストリームへのアクセス」を参照してください。

Kinesis Data Streams と Firehose ストリームのシャードは、どちらも同様にアプリケーション内ストリーム間で分割されますが、アプリケーションに認識される形式は異なります。

  • Kinesis データストリームのレコードには、shard_id フィールドが含まれており、レコードのソースシャードを識別できます。

  • Firehose 配信ストリームのレコードには、レコードのソースシャードまたはパーティションを識別するフィールドは含まれていません。これは、Firehose がこの情報をアプリケーションから抽象化するためです。

アプリケーション内入力ストリームの数の増加を評価する

ほとんどの場合、入力ストリームの複雑性やデータサイズに応じて、1 つのアプリケーション内入力ストリームで、1 つのソースストリームのスループットを処理することができます。アプリケーション内入力ストリームの数を増やす必要の有無を判断するには、Amazon CloudWatch の InputBytes および MillisBehindLatest メトリクスをモニタリングします。

InputBytes メトリクスが 100 MB/秒より大きい場合 (または、このレートより大きくなることが予想される場合)、MillisBehindLatest が増えたり、アプリケーションの問題の影響が大きくなったりする可能性があります。この問題に対応するため、アプリケーションに対して次の言語を選択することをお勧めします。

  • アプリケーションのスケーリングニーズが 100 MB/秒を超える場合は、複数のストリームと Kinesis Data Analytics for SQL アプリケーションを使用します。

  • 1 つのストリームとアプリケーションを使用する場合は、Kinesis Data Analytics for Java Applications を使用します。

MillisBehindLatest メトリクスに次のいずれかの特性がある場合は、アプリケーションの InputParallelism 設定を増やす必要があります。

  • MillisBehindLatest メトリクスが増加しつつあります。これは、アプリケーションにおいて、ストリーム内の最新データが遅延していることを意味します。

  • MillisBehindLatest メトリクスは一貫して 1,000 (1 秒あたり) を超えています。

以下が真の場合は、アプリケーションの InputParallelism 設定を増やす必要はありません。

  • MillisBehindLatest メトリクスが減少しつつあります。これは、アプリケーションにおいて、ストリーム内の最新データの遅れを取り戻していることを意味します。

  • MillisBehindLatest メトリクスは一貫して 1,000 (1 秒あたり) を下回っています。

CloudWatch の使用方法の詳細については、「CloudWatch ユーザーガイド」を参照してください。

複数のアプリケーション内入力ストリームの実装

CreateApplication」を使用してアプリケーションを作成する際、アプリケーション内入力ストリームの数を設定できます。この数は、「UpdateApplication」を使用してアプリケーションを作成した後に設定します。

注記

InputParallelism 設定は、Amazon Kinesis Data Analytics API または AWS CLI を使ってのみ設定できます。AWS Management Console を使用してこの設定を行うことはできません。AWS CLI のセットアップに関する詳細については、「ステップ 2: AWS Command Line Interface (AWS CLI) をセットアップする」を参照してください。

新しいアプリケーションの入力ストリームカウントの設定

次の例では、API アクション (CreateApplication) を使用して、新しいアプリケーションの入力ストリームカウントを 2 に設定する方法について解説します。

CreateApplication の詳細については、「CreateApplication」を参照してください。

{ "ApplicationCode": "<The SQL code the new application will run on the input stream>", "ApplicationDescription": "<A friendly description for the new application>", "ApplicationName": "<The name for the new application>", "Inputs": [ { "InputId": "ID for the new input stream", "InputParallelism": { "Count": 2 }], "Outputs": [ ... ], }] }

既存アプリケーションの入力ストリームカウントの設定

次の例では、API アクション (UpdateApplication) を使用して、既存アプリケーションの入力ストリームカウントを 2 に設定する方法について解説します。

Update_Application の詳細については、「UpdateApplication」を参照してください。

{ "InputUpdates": [ { "InputId": "yourInputId", "InputParallelismUpdate": { "CountUpdate": 2 } } ], }

Amazon Kinesis Data Analytics アプリケーションでの別のアプリケーション内ストリームへのアクセス

複数のアプリケーション内入力ストリームをアプリケーションで使用するには、別のストリームから明示的に選択する必要があります。次のコード例では、入門チュートリアルで作成した複数の入力ストリームをアプリケーションでクエリを行う方法について説明します。

次の例では、in_application_stream001 という 1 つのアプリケーション内ストリームに結合される前に、まず COUNT を使用して各ソースストリームが集約されます。事前にソースストリームを集約すると、結合されたアプリケーション内ストリームで、負荷をかけ過ぎることなく複数のストリームからのトラフィックを処理しやすくなります。

注記

この例を実行して、両方のアプリケーション内入力ストリームから結果を得るには、ソースストリームのシャード数とアプリケーションの InputParallelism パラメータを両方とも更新します。

CREATE OR REPLACE STREAM in_application_stream_001 ( ticker VARCHAR(64), ticker_count INTEGER ); CREATE OR REPLACE PUMP pump001 AS INSERT INTO in_application_stream_001 SELECT STREAM ticker_symbol, COUNT(ticker_symbol) FROM source_sql_stream_001 GROUP BY STEP(source_sql_stream_001.rowtime BY INTERVAL '60' SECOND), ticker_symbol; CREATE OR REPLACE PUMP pump002 AS INSERT INTO in_application_stream_001 SELECT STREAM ticker_symbol, COUNT(ticker_symbol) FROM source_sql_stream_002 GROUP BY STEP(source_sql_stream_002.rowtime BY INTERVAL '60' SECOND), ticker_symbol;

前述のコード例では、以下のような出力を in_application_stream001 に生成します。

Table showing ROWTIME, TICKER, and TICKER_COUNT columns with sample data entries.

追加の考慮事項

複数の入力ストリームを使用する場合は、以下の点に注意してください。

  • アプリケーション内入力ストリームの最大数は 64 です。

  • アプリケーション内入力ストリームは、アプリケーションの入力ストリームのシャード間で均等に分散されます。

  • アプリケーション内ストリームの追加により向上するパフォーマンスは、直線的にスケールしません。つまり、アプリケーション内ストリームの数を 2 倍にしても、スループットは 2 倍になりません。一般的な行サイズを使用すると、アプリケーション内ストリームはそれぞれ、1 秒あたり約 5,000~15,000 行のスループットを達成します。アプリケーション内ストリームカウントを 10 に増やすことによって、1 秒あたり 20,000~30,000 行のスループットを達成できます。スループット速度は、入力ストリームのフィールドのカウント、データ型、サイズによって異なります。

  • 一部の集計関数 (AVG) では、別のシャードに分割されている入力ストリームに適用されると、予期しない結果が生成される場合があります。集計ストリームに結合する前に、個々のシャードで集計オペレーションを実行する必要があるため、レコードが多く含まれているストリームに関係なく加重される場合があります。

  • 入力ストリームの数を増やした後にアプリケーションのパフォーマンスが低下し続ける (高い MillisBehindLatest メトリクスにより反映される) 場合は、Kinesis 処理ユニット (KPU) の上限に達している可能性があります。詳細については、「アプリケーションを自動的にスケーリングしてスループットを向上させる」を参照してください。