慎重に検討した結果、2 つのステップでSQLアプリケーションの Amazon Kinesis Data Analytics を中止することにしました。
1. 2025 年 10 月 15 日以降、SQLアプリケーション用の新しい Kinesis Data Analytics を作成することはできません。
2. 2026 年 1 月 27 日以降、アプリケーションは削除されます。SQL アプリケーションの Amazon Kinesis Data Analytics を起動または操作することはできません。SQL それ以降、Amazon Kinesis Data Analytics のサポートは利用できなくなります。詳細については、「Amazon Kinesis Data Analytics for SQL Applications の停止」を参照してください。
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
スループットの増加に合わせた入力ストリームの並列処理
注記
2023 年 9 月 12 日以降、Kinesis Data Analytics for SQL をまだ使用していない場合、Kinesis Data Firehose をソースとして使用して新しいアプリケーションを作成することはできません。詳細については、「制限」を参照してください。
Amazon Kinesis Data Analytics アプリケーションでは、アプリケーション内入力ストリームのスループットを超えるアプリケーションをスケーリングするために、複数のアプリケーション内入力ストリームをサポートできます。アプリケーション内入力ストリームの詳細については、「Amazon Kinesis Data Analytics for SQL Applications: 仕組み」を参照してください。
ほとんどの場合、Amazon Kinesis Data Analytics では、アプリケーションにフィードされる Kinesis ストリームまたは Firehose ソースストリームの容量を処理できるように、アプリケーションがスケールされます。ただし、ソースストリームのスループットが、単一のアプリケーション内入力ストリームのスループットを超える場合は、アプリケーションで使用されるアプリケーション内入力ストリームの数を明示的に増やすことができます。そのためには、InputParallelism
パラメータを使用します。
InputParallelism
パラメータが 1 以上の場合、Amazon Kinesis Data Analytics は、アプリケーション内ストリーム間のソースストリームのパーティションを均等に分割します。たとえば、ソースストリームに 50 シャードあり、InputParallelism
を 2
に設定した場合、アプリケーション内入力ストリームはそれぞれ、25 のソースストリームのシャードから入力を受け取ります。
アプリケーション内ストリームの数を増やす場合は、アプリケーションから、各ストリームのデータに明示的にアクセスする必要があります。コードで複数のアプリケーション内ストリームにアクセスする方法については、「Amazon Kinesis Data Analytics アプリケーションでの別のアプリケーション内ストリームへのアクセス」を参照してください。
Kinesis Data Streams と Firehose ストリームのシャードは、どちらも同様にアプリケーション内ストリーム間で分割されますが、アプリケーションに認識される形式は異なります。
Kinesis データストリームのレコードには、
shard_id
フィールドが含まれており、レコードのソースシャードを識別できます。Firehose 配信ストリームのレコードには、レコードのソースシャードまたはパーティションを識別するフィールドは含まれていません。これは、Firehose がこの情報をアプリケーションから抽象化するためです。
アプリケーション内入力ストリームの数の増加を評価する
ほとんどの場合、入力ストリームの複雑性やデータサイズに応じて、1 つのアプリケーション内入力ストリームで、1 つのソースストリームのスループットを処理することができます。アプリケーション内入力ストリームの数を増やす必要の有無を判断するには、Amazon CloudWatch の InputBytes
および MillisBehindLatest
メトリクスをモニタリングします。
InputBytes
メトリクスが 100 MB/秒より大きい場合 (または、このレートより大きくなることが予想される場合)、MillisBehindLatest
が増えたり、アプリケーションの問題の影響が大きくなったりする可能性があります。この問題に対応するため、アプリケーションに対して次の言語を選択することをお勧めします。
アプリケーションのスケーリングニーズが 100 MB/秒を超える場合は、複数のストリームと Kinesis Data Analytics for SQL アプリケーションを使用します。
1 つのストリームとアプリケーションを使用する場合は、Kinesis Data Analytics for Java Applications を使用します。
MillisBehindLatest
メトリクスに次のいずれかの特性がある場合は、アプリケーションの InputParallelism
設定を増やす必要があります。
MillisBehindLatest
メトリクスが増加しつつあります。これは、アプリケーションにおいて、ストリーム内の最新データが遅延していることを意味します。MillisBehindLatest
メトリクスは一貫して 1,000 (1 秒あたり) を超えています。
以下が真の場合は、アプリケーションの InputParallelism
設定を増やす必要はありません。
MillisBehindLatest
メトリクスが減少しつつあります。これは、アプリケーションにおいて、ストリーム内の最新データの遅れを取り戻していることを意味します。MillisBehindLatest
メトリクスは一貫して 1,000 (1 秒あたり) を下回っています。
CloudWatch の使用方法の詳細については、「CloudWatch ユーザーガイド」を参照してください。
複数のアプリケーション内入力ストリームの実装
「CreateApplication」を使用してアプリケーションを作成する際、アプリケーション内入力ストリームの数を設定できます。この数は、「UpdateApplication」を使用してアプリケーションを作成した後に設定します。
注記
InputParallelism
設定は、Amazon Kinesis Data Analytics API または AWS CLI を使ってのみ設定できます。AWS Management Console を使用してこの設定を行うことはできません。AWS CLI のセットアップに関する詳細については、「ステップ 2: AWS Command Line Interface (AWS CLI) をセットアップする」を参照してください。
新しいアプリケーションの入力ストリームカウントの設定
次の例では、API アクション (CreateApplication
) を使用して、新しいアプリケーションの入力ストリームカウントを 2 に設定する方法について解説します。
CreateApplication
の詳細については、「CreateApplication」を参照してください。
{ "ApplicationCode": "
<The SQL code the new application will run on the input stream>
", "ApplicationDescription": "<A friendly description for the new application>
", "ApplicationName": "<The name for the new application>
", "Inputs": [ { "InputId": "ID for the new input stream
", "InputParallelism": { "Count": 2 }], "Outputs": [ ... ], }] }
既存アプリケーションの入力ストリームカウントの設定
次の例では、API アクション (UpdateApplication
) を使用して、既存アプリケーションの入力ストリームカウントを 2 に設定する方法について解説します。
Update_Application
の詳細については、「UpdateApplication」を参照してください。
{ "InputUpdates": [ { "InputId": "
yourInputId
", "InputParallelismUpdate": { "CountUpdate": 2 } } ], }
Amazon Kinesis Data Analytics アプリケーションでの別のアプリケーション内ストリームへのアクセス
複数のアプリケーション内入力ストリームをアプリケーションで使用するには、別のストリームから明示的に選択する必要があります。次のコード例では、入門チュートリアルで作成した複数の入力ストリームをアプリケーションでクエリを行う方法について説明します。
次の例では、in_application_stream001
という 1 つのアプリケーション内ストリームに結合される前に、まず COUNT を使用して各ソースストリームが集約されます。事前にソースストリームを集約すると、結合されたアプリケーション内ストリームで、負荷をかけ過ぎることなく複数のストリームからのトラフィックを処理しやすくなります。
注記
この例を実行して、両方のアプリケーション内入力ストリームから結果を得るには、ソースストリームのシャード数とアプリケーションの InputParallelism
パラメータを両方とも更新します。
CREATE OR REPLACE STREAM in_application_stream_001 ( ticker VARCHAR(64), ticker_count INTEGER ); CREATE OR REPLACE PUMP pump001 AS INSERT INTO in_application_stream_001 SELECT STREAM ticker_symbol, COUNT(ticker_symbol) FROM source_sql_stream_001 GROUP BY STEP(source_sql_stream_001.rowtime BY INTERVAL '60' SECOND), ticker_symbol; CREATE OR REPLACE PUMP pump002 AS INSERT INTO in_application_stream_001 SELECT STREAM ticker_symbol, COUNT(ticker_symbol) FROM source_sql_stream_002 GROUP BY STEP(source_sql_stream_002.rowtime BY INTERVAL '60' SECOND), ticker_symbol;
前述のコード例では、以下のような出力を in_application_stream001
に生成します。

追加の考慮事項
複数の入力ストリームを使用する場合は、以下の点に注意してください。
アプリケーション内入力ストリームの最大数は 64 です。
アプリケーション内入力ストリームは、アプリケーションの入力ストリームのシャード間で均等に分散されます。
アプリケーション内ストリームの追加により向上するパフォーマンスは、直線的にスケールしません。つまり、アプリケーション内ストリームの数を 2 倍にしても、スループットは 2 倍になりません。一般的な行サイズを使用すると、アプリケーション内ストリームはそれぞれ、1 秒あたり約 5,000~15,000 行のスループットを達成します。アプリケーション内ストリームカウントを 10 に増やすことによって、1 秒あたり 20,000~30,000 行のスループットを達成できます。スループット速度は、入力ストリームのフィールドのカウント、データ型、サイズによって異なります。
一部の集計関数 (AVG) では、別のシャードに分割されている入力ストリームに適用されると、予期しない結果が生成される場合があります。集計ストリームに結合する前に、個々のシャードで集計オペレーションを実行する必要があるため、レコードが多く含まれているストリームに関係なく加重される場合があります。
入力ストリームの数を増やした後にアプリケーションのパフォーマンスが低下し続ける (高い
MillisBehindLatest
メトリクスにより反映される) 場合は、Kinesis 処理ユニット (KPU) の上限に達している可能性があります。詳細については、「アプリケーションを自動的にスケーリングしてスループットを向上させる」を参照してください。