例: イベントのタイムスタンプを使用したタンブリングウィンドウ - Amazon Kinesis Data Analytics for SQL Applications デベロッパーガイド

慎重に検討した結果、2 つのステップでSQLアプリケーションの Amazon Kinesis Data Analytics を中止することにしました。

1. 2025 年 10 月 15 日以降、SQLアプリケーション用の新しい Kinesis Data Analytics を作成することはできません。

2. 2026 年 1 月 27 日以降、アプリケーションは削除されます。SQL アプリケーションの Amazon Kinesis Data Analytics を起動または操作することはできません。SQL それ以降、Amazon Kinesis Data Analytics のサポートは利用できなくなります。詳細については、「Amazon Kinesis Data Analytics for SQL Applications の停止」を参照してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

例: イベントのタイムスタンプを使用したタンブリングウィンドウ

ウィンドウクエリが各ウィンドウを重複しない方式で処理する場合、ウィンドウはタンブリングウィンドウと呼ばれます。詳細については、「タンブリングウィンドウ (GROUP BY を使用した集計)」を参照してください。この Amazon Kinesis Data Analytics の例は、イベントのタイムスタンプを使用するタンブリングウィンドウを示します。このタイムスタンプは、ストリーミングデータに含まれているユーザーが作成したタイムスタンプです。アプリケーションがレコードを受信したときに Kinesis Data Analytics が作成する ROWTIME タイムスタンプだけを使用するのではなく、こうした方法を用います。アプリケーションの受信時ではなく、イベント発生時に基づいて集約を作成する場合は、ストリーミングデータでイベントのタイムスタンプを使用します。この例では、ROWTIME 値によって、1 分ごとに集計がトリガーされ、レコードは、ROWTIME とそこに含まれるイベント時間の両方で集計されます。

この例では、次のレコードを Amazon Kinesis ストリームに書き込みます。EVENT_TIME 値は、イベント発生時からレコードが Kinesis Data Analytics に取り込まれるまで、遅延が生じる可能性のあるプロセスや送信の遅延をシミュレートするために、過去 5 秒間に設定されます。

{"EVENT_TIME": "2018-06-13T14:11:05.766191", "TICKER": "TBV", "PRICE": 43.65} {"EVENT_TIME": "2018-06-13T14:11:05.848967", "TICKER": "AMZN", "PRICE": 35.61} {"EVENT_TIME": "2018-06-13T14:11:05.931871", "TICKER": "MSFT", "PRICE": 73.48} {"EVENT_TIME": "2018-06-13T14:11:06.014845", "TICKER": "AMZN", "PRICE": 18.64} ...

次に、AWS Management Console でKinesis Data Analytics アプリケーションを作成します。 データストリームをストリーミングソースとして使用します。検出プロセスでストリーミングソースのサンプルレコードが読み込まれます。次のように、アプリケーション内スキーマに 3 つの列 (EVENT_TIMETICKERPRICE) あると推察します。

イベント時間列、ティッカー列、料金列を含むアプリケーション内スキーマを表示するコンソールのスクリーンショット。

データのウィンドウ集約を作成するには、アプリケーションコードで MIN 関数および MAX 関数を使用します。続いて、次のスクリーンショットに示すように、生成されたデータを別のアプリケーション内ストリームに挿入します。

アプリケーション内ストリームに結果のデータを表示するコンソールのスクリーンショット。

次の手順では、イベント時間に基づき、タンブリングウィンドウの入力ストリームに値を集約する Kinesis Data Analytics アプリケーションを作成します。

ステップ 1: Kinesis データストリームを作成する

次のように、Amazon Kinesis データストリームを作成して、レコードを追加します。

  1. AWS Management Consoleにサインインして、Kinesis コンソール (https://console.aws.amazon.com/kinesis) を開きます。

  2. ナビゲーションペインで、[データストリーム] を選択します。

  3. [Kinesis ストリームの作成] を選択後、1 つのシャードがあるストリームを作成します。詳細については、「Amazon Kinesis Data Streams デベロッパーガイド」の「Create a Stream」を参照してください。

  4. 本稼働環境の Kinesis データストリームにレコードを書き込むには、Kinesis Client Library または Kinesis Data Streams API を使用することをお勧めします。わかりやすいように、この例では、以下の Python スクリプトを使用してレコードを生成します。サンプルのティッカーレコードを入力するには、このコードを実行します。このシンプルなコードによって、ランダムなティッカーレコードが連続してストリームに書き込まれます。後のステップでアプリケーションスキーマを生成できるように、スクリプトを実行したままにしておきます。

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

ステップ 2: Kinesis Data Analytics アプリケーションを作成する

次のように Kinesis Data Analytics アプリケーションを作成します。

  1. https://console.aws.amazon.com/kinesisanalytics にある Managed Service for Apache Flink コンソールを開きます。

  2. [アプリケーションの作成] を選択し、アプリケーション名を入力して、[アプリケーションの作成] を選択します。

  3. アプリケーション詳細ページで、[ストリーミングデータの接続] を選択してソースに接続します。

  4. [ソースに接続] ページで、以下の操作を実行します。

    1. 前のセクションで作成したストリームを選択します。

    2. [スキーマの検出] を選択します。作成されたアプリケーション内ストリーム用の推測スキーマと、推測に使用されたサンプルレコードがコンソールに表示されるまで待ちます。推測されたスキーマには 3 つの列があります。

    3. [スキーマの編集] を選択します。[EVENT_TIME] 列の [列のタイプ] を TIMESTAMP に変更します。

    4. [Save schema and update stream samples] を選択します。コンソールでスキーマが保存されたら、[終了] を選択します。

    5. [Save and continue] を選択します。

  5. アプリケーション詳細ページで、[SQL エディタに移動] を選択します。アプリケーションを起動するには、表示されたダイアログボックスで [はい、アプリケーションを起動します] を選択します。

  6. SQL エディタで、次のように、アプリケーションコードを作成してその結果を確認します。

    1. 次のアプリケーションコードをコピーしてエディタに貼り付けます。

      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME timestamp, TICKER VARCHAR(4), min_price REAL, max_price REAL); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM STEP("SOURCE_SQL_STREAM_001".EVENT_TIME BY INTERVAL '60' SECOND), TICKER, MIN(PRICE) AS MIN_PRICE, MAX(PRICE) AS MAX_PRICE FROM "SOURCE_SQL_STREAM_001" GROUP BY TICKER, STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND), STEP("SOURCE_SQL_STREAM_001".EVENT_TIME BY INTERVAL '60' SECOND);
    2. [Save and run SQL] を選択します。

      [リアルタイム分析] タブに、アプリケーションで作成されたすべてのアプリケーション内ストリームが表示され、データを検証できます。