慎重に検討した結果、アプリケーションの Amazon Kinesis Data Analytics は 2 つのステップSQLで中止することにしました。
1. 2025 年 10 月 15 日以降、SQLアプリケーションの新しい Kinesis Data Analytics を作成することはできません。
2. 2026 年 1 月 27 日以降、アプリケーションを削除します。SQL アプリケーションの Amazon Kinesis Data Analytics を開始または操作することはできません。SQL それ以降、Amazon Kinesis Data Analytics のサポートは終了します。詳細については、「Amazon Kinesis Data Analytics for SQL Applications の中止」を参照してください。
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
例: DateTime 値の変換
Amazon Kinesis Data Analytics は、タイムスタンプへの列の変換をサポートします。たとえば、GROUP BY
列に加えて、独自のタイムスタンプを ROWTIME
句の一部として、別の時間ベースウィンドウとして使用するとします。Kinesis Data Analytics では、日付と時刻のフィールドで機能するオペレーションと SQL 関数がサポートされています。
-
日付と時刻の演算子 日付、時間、間隔の各データ型に算術オペレーションを実行できます。詳細については、「Amazon Managed Service for Apache Flink SQL リファレンス」の「Date, Timestamp, and Interval Operators(日付、タイムスタンプ、間隔の演算子)」を参照してください。
-
SQL 関数 – 以下が含まれます。詳細については、「Amazon Managed Service for Apache Flink SQL リファレンス」の「Date and Time Functions(日付と時刻の関数)」を参照してください。
-
EXTRACT()
– 日付、時刻、タイムスタンプ、または間隔式から 1 つのフィールドを抽出します。 -
CURRENT_TIME
– クエリが実行された時刻 (UTC) を返します。 -
CURRENT_DATE
– クエリが実行された日付 (UTC) を返します。 -
CURRENT_TIMESTAMP
– クエリが実行されたタイムスタンプ (UTC) を返します。 -
LOCALTIME
– Kinesis Data Analytics が実行されている環境で定義されたとおりにクエリが実行された現在時刻 (UTC) を返します。 -
LOCALTIMESTAMP
– Kinesis Data Analytics が実行されている環境で定義された現在時刻のタイムスタンプ (UTC) を返します。
-
-
SQL 拡張 – 以下が含まれます。詳細については、「Amazon Managed Service for Apache Flink SQL リファレンス」の「Date and Time Functions(日付と時刻の関数)」と「Datetime Conversion Functions(Datetime の変換関数)」を参照してください。
-
CURRENT_ROW_TIMESTAMP
– ストリームの各行の新しいタイムスタンプを返します。 -
TSDIFF
– 2 つのタイムスタンプの差をミリ秒で返します。 -
CHAR_TO_DATE
– 文字列を日付に変換します。 -
CHAR_TO_TIME
– 文字列を時間に変換します。 -
CHAR_TO_TIMESTAMP
– 文字列をタイムスタンプに変換します。 -
DATE_TO_CHAR
– 日付を文字列に変換します。 -
TIME_TO_CHAR
– 時間を文字列に変換します。 -
TIMESTAMP_TO_CHAR
– タイムスタンプを文字列に変換します。
-
前述の SQL 関数のほとんどは列を変換する形式を使用します。形式には柔軟性があります。たとえば、yyyy-MM-dd hh:mm:ss
という形式を指定して入力文字列 2009-09-16 03:15:24
をタイムスタンプに変換できます。詳細については、「Amazon Managed Service for Apache Flink SQL リファレンス」の「Char To Timestamp(Sys)」を参照してください。
例: 日付の変換
この例では、次のレコードを Amazon Kinesis データストリームに書き込みます。
{"EVENT_TIME": "2018-05-09T12:50:41.337510", "TICKER": "AAPL"} {"EVENT_TIME": "2018-05-09T12:50:41.427227", "TICKER": "MSFT"} {"EVENT_TIME": "2018-05-09T12:50:41.520549", "TICKER": "INTC"} {"EVENT_TIME": "2018-05-09T12:50:41.610145", "TICKER": "MSFT"} {"EVENT_TIME": "2018-05-09T12:50:41.704395", "TICKER": "AAPL"} ...
次に、Kinesis ストリームをストリーミングソースとして使用して、コンソールで Kinesis Data Analytics アプリケーションを作成します。検出プロセスでストリーミングソースのサンプルレコードが読み込まれ、次のように、アプリケーション内スキーマの列が 2 つ (EVENT_TIME
および TICKER
) であると推察します。
次に、SQL 関数でアプリケーションコードを使用して、さまざまな方法で EVENT_TIME
タイムスタンプフィールドを変換します。その後、次のスクリーンショットに示すように生成されたデータを別のアプリケーション内ストリームに挿入します。
ステップ 1: Kinesis データストリームを作成する
Amazon Kinesis データストリームを作成し、次のようにイベント時間およびティッカーレコードを入力します。
にサインイン AWS Management Console し、https://console.aws.amazon.com/kinesis
で Kinesis コンソールを開きます。 -
ナビゲーションペインで、[データストリーム] を選択します。
-
[Kinesis ストリームの作成] を選択し、1 つのシャードがあるストリームを作成します。
-
以下の Python コードを実行して、サンプルデータをストリームに入力します。このシンプルなコードは、ランダムなティッカーシンボルと現在時刻のタイムスタンプを含むレコードを継続的にストリームに書き込みます。
import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
ステップ 2: Amazon Kinesis Data Analytics アプリケーションを作成する
次のようにアプリケーションを作成します。
https://console.aws.amazon.com/kinesisanalytics
にある Managed Service for Apache Flink コンソールを開きます。 -
[アプリケーションの作成] を選択し、アプリケーション名を入力して、[アプリケーションの作成] を選択します。
-
アプリケーション詳細ページで、[ストリーミングデータの接続] を選択してソースに接続します。
-
[ソースに接続] ページで、以下の操作を実行します。
-
前のセクションで作成したストリームを選択します。
-
IAM ロールの作成を選択します。
-
[スキーマの検出] を選択します。作成されたアプリケーション内ストリーム用の推測スキーマと、推測に使用されたサンプルレコードがコンソールに表示されるまで待ちます。推測されたスキーマには 2 つの列があります。
-
[スキーマの編集] を選択します。[EVENT_TIME] 列 の [列のタイプ] を
TIMESTAMP
に変更します。 -
[Save schema and update stream samples] を選択します。コンソールでスキーマが保存されたら、[終了] を選択します。
-
[Save and continue] を選択します。
-
-
アプリケーション詳細ページで、[SQL エディタに移動] を選択します。アプリケーションを起動するには、表示されたダイアログボックスで [はい、アプリケーションを起動します] を選択します。
-
SQL エディタで、次のように、アプリケーションコードを作成してその結果を確認します。
-
次のアプリケーションコードをコピーしてエディタに貼り付けます。
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( TICKER VARCHAR(4), event_time TIMESTAMP, five_minutes_before TIMESTAMP, event_unix_timestamp BIGINT, event_timestamp_as_char VARCHAR(50), event_second INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE, UNIX_TIMESTAMP(EVENT_TIME), TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME), EXTRACT(SECOND FROM EVENT_TIME) FROM "SOURCE_SQL_STREAM_001"
-
[Save and run SQL] を選択します。[リアルタイム分析] タブに、アプリケーションで作成されたすべてのアプリケーション内ストリームが表示され、データを検証できます。
-