慎重な検討の結果、Amazon Kinesis Data Analytics for SQL アプリケーションのサポートは終了することになりました。サポート終了は次の 2 段階で行われます。
1. 2025 年 10 月 15 日以降、新しい Kinesis Data Analytics for SQL アプリケーションを作成することはできなくなります。
2. 2026 年 1 月 27 日以降、アプリケーションは削除されます。Amazon Kinesis Data Analytics for SQL アプリケーションを起動することも操作することもできなくなります。これ以降、Amazon Kinesis Data Analytics for SQL のサポートは終了します。詳細については、「Amazon Kinesis Data Analytics for SQL アプリケーションのサポート終了」を参照してください。
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
アプリケーション入力の設定
Amazon Kinesis Data Analytics アプリケーションでは、1 つのストリーミングソースから入力を受け取ることができます。また、オプションで 1 つのリファレンスデータソースを使用できます。詳細については、「Amazon Kinesis Data Analytics for SQL Applications: 仕組み」を参照してください。このトピックのセクションでは、アプリケーション入力ソースについて説明します。
トピック
ストリーミングソースの設定
アプリケーションを作成するときに、ストリーミングソースを指定します。アプリケーションを作成した後に入力を変更することもできます。Amazon Kinesis Data Analytics では、アプリケーションに対して以下のストリーミングソースがサポートされています。
-
Kinesis データストリーム
-
Firehose 配信ストリーム
注記
2023 年 9 月 12 日以降、Kinesis Data Analytics for SQL をまだ使用していない場合、Kinesis Data Firehose をソースとして使用して新しいアプリケーションを作成することはできません。KinesisFirehoseInput
と一緒に Kinesis Data Analytics for SQL アプリケーションを使用している既存のお客様は、Kinesis Data Analytics を使用して既存のアカウント内で、引き続き KinesisFirehoseInput
でアプリケーションを追加できます。既存のお客様で、KinesisFirehoseInput
と一緒に Kinesis Data Analytics for SQL アプリケーションを使用して新規アカウントを作成される場合は、サービス制限拡大フォームを通してケースを作成できます。詳細については、AWS サポート センター
注記
Kinesis データストリームが暗号化されている場合、Kinesis Data Analytics は暗号化されたストリームのデータにシームレスにアクセスし、それ以上の設定は必要ありません。Kinesis Data Analytics では、Kinesis Data Streams から読み取った暗号化されていないデータは保存されません。詳細については、「Kinesis Data Streams 用のサーバー側の暗号化とは」を参照してください。
Kinesis Data Analytics は、ストリーミングソースに新しいデータがあるか継続的にポーリングし、入力設定に応じてアプリケーション内ストリームに取り込みます。
注記
アプリケーションの入力として Kinesis ストリームを追加しても、ストリーム内のデータには影響を与えません。同じ Kinesis ストリームに Firehose 配信ストリームなどの別のリソースもアクセスした場合、Firehose 配信ストリームと Kinesis Data Analytics アプリケーションの両方が同じデータを受け取ります。ただし、スループットとスロットリングは影響を受ける可能性があります。
アプリケーションコードは、アプリケーション内ストリームをクエリできます。入力設定の一部として以下を指定します。
-
ストリーミングソース – ストリームの Amazon リソースネーム (ARN) と、Kinesis Data Analytics がユーザーに代わってストリームにアクセスするために引き受けることができる IAM ロールを指定します。
-
アプリケーション内ストリーム名のプレフィックス – アプリケーションを起動すると、Kinesis Data Analytics によって指定されたアプリケーション内ストリームが作成されます。この名前を使用して、アプリケーションコードでアプリケーション内ストリームにアクセスします。
オプションで、ストリーミングリソースを複数のアプリケーション内ストリームにマッピングできます。詳細については、「制限」を参照してください。この場合、Amazon Kinesis Data Analytics は、
prefix
_001
、prefix
_002
、prefix
_003
などの名前で、アプリケーション内ストリームを指定された数だけ作成します。デフォルトでは、Kinesis Data Analytics はストリーミングソースをprefix
_001
という名前の 1 つのアプリケーション内ストリームにマッピングします。アプリケーション内ストリームに挿入できる行は、速度の制限があります。そのため、Kinesis Data Analytics では複数のアプリケーション内ストリームをサポートして、より高速にアプリケーションにレコードを届けます。アプリケーションがストリーミングソースのデータをアップし続けることができない場合は、並列処理ユニットを追加してパフォーマンスを向上させることができます。
-
マッピングスキーマ – ストリーミングソースでのレコード形式 (JSON、CSV) を指定します。また、ストリームの各レコードが、作成されるアプリケーション内ストリーム内の列にマッピングされる方法も指定します。ここで、列名とデータ型を指定します。
注記
Kinesis Data Analytics は、入力アプリケーション内ストリームの作成時に、識別子 (ストリーム名および列名) に引用符を追加します。このストリームと列をクエリする場合は、完全一致 (大文字と小文字が正確に一致) を使用して引用符内を指定する必要があります。識別子の詳細については、「Amazon Managed Service for Apache Flink SQL リファレンス」の「Identifiers」を参照してください。
Amazon Kinesis Data Analytics コンソールでアプリケーションを作成し、入力を設定できます。その後、コンソールは必要な API コールを行います。アプリケーション入力の設定は、新しいアプリケーション API の作成時、または既存のアプリケーションに入力設定を追加するときに行うことができます。詳細については、CreateApplicationおよびAddApplicationInputを参照してください。以下は Createapplication
API リクエストボディの入力設定部分です。
"Inputs": [
{
"InputSchema": {
"RecordColumns": [
{
"Mapping": "string",
"Name": "string",
"SqlType": "string"
}
],
"RecordEncoding": "string",
"RecordFormat": {
"MappingParameters": {
"CSVMappingParameters": {
"RecordColumnDelimiter": "string",
"RecordRowDelimiter": "string"
},
"JSONMappingParameters": {
"RecordRowPath": "string"
}
},
"RecordFormatType": "string"
}
},
"KinesisFirehoseInput": {
"ResourceARN": "string",
"RoleARN": "string"
},
"KinesisStreamsInput": {
"ResourceARN": "string",
"RoleARN": "string"
},
"Name": "string"
}
]
リファレンスソースの設定
オプションで既存のアプリケーションにリファレンスデータソースを追加して、ストリーミングソースから入力されるデータを強化できます。リファレンスデータは Amazon S3 バケット内のオブジェクトとして保存する必要があります。アプリケーションが起動すると、Amazon Kinesis Data Analytics は Amazon S3 オブジェクトを読み取り、アプリケーション内リファレンステーブルを作成します。その後、アプリケーションコードでこれをアプリケーション内ストリームと結合できます。
サポートされている形式 (CSV、JSON) で、Amazon S3 オブジェクトにリファレンスデータを保存します。たとえば、アプリケーションで株注文を分析すると仮定します。ストリーミングソースには次に示すレコード形式があるとします。
Ticker, SalePrice, OrderId
AMZN $700 1003
XYZ $250 1004
...
この場合、会社名などの詳細を株価ティッカーに提供するリファレンスデータソースを保持することを検討する場合があります。
Ticker, Company AMZN, Amazon XYZ, SomeCompany ...
アプリケーションのリファレンスデータソースは、API またはコンソールで追加できます。Amazon Kinesis Data Analytics では、以下の API アクションで、リファレンスデータソースを管理します。
コンソールを使用してリファレンスデータを追加する方法の詳細については、「例: Kinesis Data Analytics アプリケーションにリファレンスデータを追加する」を参照してください。
次の点に注意してください:
-
アプリケーションが実行されている場合、Kinesis Data Analytics はアプリケーション内リファレンステーブルを作成し、ただちにリファレンスデータをロードします。
-
アプリケーションが実行されていない (例えば、準備完了状態など) 場合、Kinesis Data Analytics は更新された入力設定を保存するだけです。アプリケーションの実行が始まると、Kinesis Data Analytics はリファレンスデータをテーブルとしてアプリケーションにロードします。
Kinesis Data Analytics がアプリケーション内リファレンステーブルを作成した後で、データを更新するとします。Amazon S3 オブジェクトを更新したり、別の Amazon S3 オブジェクトを使用したいという場合があるかもしれません。この場合は、UpdateApplication を明示的に呼び出すか、コンソールで [アクション]、[リファレンスデータテーブルを同期] の順に選択します。Kinesis Data Analytics では、アプリケーション内リファレンステーブルは自動的に更新されません。
リファレンスデータソースとして作成できる Amazon S3 オブジェクトには、サイズの制限があります。詳細については、「制限」を参照してください。オブジェクトのサイズが制限を超えた場合には、Kinesis Data Analytics でデータをロードできません。アプリケーションの状態は実行中と表示されますが、データが読み込まれません。
リファレンスデータソースを追加する場合は、次の情報を指定します。
-
S3 バケットおよびオブジェクトキー名 – バケット名およびオブジェクトキーに加えて、ユーザーの代わりにオブジェクトを読み取るために Kinesis Data Analytics が引き受けることができる IAM ロールも指定します。
-
アプリケーション内リファレンステーブル名 – Kinesis Data Analytics がこのアプリケーション内テーブルを作成し、Amazon S3 オブジェクトを読み取ってそこに入力します。これは、アプリケーションコードで指定するテーブルの名前です。
-
マッピングスキーマ – レコード形式 (JSON, CSV)、Amazon S3 オブジェクトに保存されたデータのエンコードを記述します。また、各データ要素がどのようにアプリケーション内リファレンステーブルにマッピングされるかも記述します。
以下に、AddApplicationReferenceDataSource
API リクエストの本文を示します。
{
"applicationName": "string",
"CurrentapplicationVersionId": number,
"ReferenceDataSource": {
"ReferenceSchema": {
"RecordColumns": [
{
"IsDropped": boolean,
"Mapping": "string",
"Name": "string",
"SqlType": "string"
}
],
"RecordEncoding": "string",
"RecordFormat": {
"MappingParameters": {
"CSVMappingParameters": {
"RecordColumnDelimiter": "string",
"RecordRowDelimiter": "string"
},
"JSONMappingParameters": {
"RecordRowPath": "string"
}
},
"RecordFormatType": "string"
}
},
"S3ReferenceDataSource": {
"BucketARN": "string",
"FileKey": "string",
"ReferenceRoleARN": "string"
},
"TableName": "string"
}
}