

慎重に検討した結果、Amazon Kinesis Data Analytics for SQL アプリケーションを中止することにしました。

1. **2025 年 9 月 1** 日以降、Amazon Kinesis Data Analytics for SQL アプリケーションのバグ修正は提供されません。これは、今後の廃止によりサポートが制限されるためです。

2. **2025 年 10 月 15** 日以降、新しい Kinesis Data Analytics for SQL アプリケーションを作成することはできません。

3. **2026 年 1 月 27 日**以降、アプリケーションは削除されます。Amazon Kinesis Data Analytics for SQL アプリケーションを起動することも操作することもできなくなります。これ以降、Amazon Kinesis Data Analytics for SQL のサポートは終了します。詳細については、「[Amazon Kinesis Data Analytics for SQL アプリケーションのサポート終了](discontinuation.md)」を参照してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# 例: Stagger Window
<a name="examples-window-stagger"></a>

ウィンドウクエリがそれぞれ固有のパーティションキーごとに別々のウィンドウを処理するとき、一致するキーを持つデータが届くと、このウィンドウは*ずらしウィンドウ*と呼ばれます。詳細については、「[Stagger Windows](stagger-window-concepts.md)」を参照してください。この Amazon Kinesis Data Analytics 例では、EVENT\$1TIME 列と TICKER 列を使用してずらしウィンドウを作成しています。ソースストリームには、同じ EVENT\$1TIME 値と TICKER 値を持つ、6 つのレコードのグループが含まれています。これらは 1 分間以内に届きますが、必ずしも同じ分値で届くわけではありません (例: `18:41:xx`)。

この例では、次のレコードを Kinesis データストリームに次のタイミングで書き込みます。スクリプトは時間をストリームに書き込みませんが、レコードがアプリケーションによって取り込まれた時間が `ROWTIME` フィールドに書き込まれます。

```
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:17:30
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:17:40
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:17:50
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:18:00
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:18:10
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:18:21
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:18:31
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:18:41
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:18:51
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:19:01
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:19:11
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:19:21
...
```



次に、Kinesis データストリームをストリーミングソースとして AWS マネジメントコンソール、 で Kinesis Data Analytics アプリケーションを作成します。検出プロセスでストリーミングソースのサンプルレコードが読み込まれます。次のように、アプリケーション内スキーマに 2 つの列 (`EVENT_TIME` および `TICKER`) があると推察します。

![\[料金列とティッカー列を含むアプリケーション内スキーマを表示するコンソールのスクリーンショット。\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/images/ex_stagger_schema.png)


データのウィンドウ集約を作成するには、アプリケーションコードで `COUNT` 関数を使用します。続いて、次のスクリーンショットに示すように、生成されたデータを別のアプリケーション内ストリームに挿入します。



![\[アプリケーション内ストリームに結果のデータを表示するコンソールのスクリーンショット。\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/images/ex_stagger.png)


次の手順では、EVENT\$1TIME および TICKER に基づき、ずらしウィンドウの入力ストリームに値を集約する Kinesis Data Analytics アプリケーションを作成します。

**Topics**
+ [ステップ 1: Kinesis データストリームを作成する](#examples-stagger-window-1)
+ [ステップ 2: Kinesis Data Analytics アプリケーションを作成する](#examples-stagger-window-2)

## ステップ 1: Kinesis データストリームを作成する
<a name="examples-stagger-window-1"></a>

次のように、Amazon Kinesis データストリームを作成して、レコードを追加します。

1. にサインイン AWS マネジメントコンソール し、[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) で Kinesis コンソールを開きます。

1. ナビゲーションペインで、[**データストリーム**] を選択します。

1. [**Kinesis ストリームの作成**] を選択後、1 つのシャードがあるストリームを作成します。詳細については、「Amazon Kinesis Data Streams デベロッパーガイド」の「[Create a Stream](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)」を参照してください。

1. 本稼働環境の Kinesis データストリームにレコードを書き込むには、[Kinesis Producer Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) または [Kinesis Data Streams API](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html) を使用することをお勧めします。わかりやすいように、この例では、以下の Python スクリプトを使用してレコードを生成します。サンプルのティッカーレコードを入力するには、このコードを実行します。このシンプルなコードは、同じランダムな `EVENT_TIME` とティッカーシンボルを持つ 6 つのレコードのグループを、1 分間に渡ってストリームに継続的に書き込みます。後のステップでアプリケーションスキーマを生成できるように、スクリプトを実行したままにしておきます。

   ```
    
   import datetime
   import json
   import random
   import time
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       event_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=10)
       return {
           "EVENT_TIME": event_time.isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           # Send six records, ten seconds apart, with the same event time and ticker
           for _ in range(6):
               print(data)
               kinesis_client.put_record(
                   StreamName=stream_name,
                   Data=json.dumps(data),
                   PartitionKey="partitionkey",
               )
               time.sleep(10)
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## ステップ 2: Kinesis Data Analytics アプリケーションを作成する
<a name="examples-stagger-window-2"></a>

次のように Kinesis Data Analytics アプリケーションを作成します。

1. [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics) にある Managed Service for Apache Flink コンソールを開きます。

1. [**アプリケーションの作成**] を選択し、アプリケーション名を入力して、[**アプリケーションの作成**] を選択します。

1. アプリケーション詳細ページで、[**ストリーミングデータの接続**] を選択してソースに接続します。

1. [**ソースに接続**] ページで、以下の操作を実行します。

   

   1. 前のセクションで作成したストリームを選択します。

   1. [**スキーマの検出**] を選択します。作成されたアプリケーション内ストリーム用の推測スキーマと、推測に使用されたサンプルレコードがコンソールに表示されるまで待ちます。推測されたスキーマには 2 つの列があります。

   1. [**スキーマの編集**] を選択します。[**EVENT\$1TIME**] 列の [**列のタイプ**] を `TIMESTAMP` に変更します。

   1. [**Save schema and update stream samples**] を選択します。コンソールでスキーマが保存されたら、[**終了**] を選択します。

   1. **[保存して続行]** を選択します。

1. アプリケーション詳細ページで、[**SQL エディタに移動**] を選択します。アプリケーションを起動するには、表示されたダイアログボックスで [**はい、アプリケーションを起動します**] を選択します。

1. SQL エディタで、次のように、アプリケーションコードを作成してその結果を確認します。

   1. 次のアプリケーションコードをコピーしてエディタに貼り付けます。

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
          event_time TIMESTAMP,
          ticker_symbol    VARCHAR(4),
          ticker_count     INTEGER);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
        INSERT INTO "DESTINATION_SQL_STREAM" 
          SELECT STREAM 
              EVENT_TIME, 
              TICKER,
              COUNT(TICKER) AS ticker_count
          FROM "SOURCE_SQL_STREAM_001"
          WINDOWED BY STAGGER (
                  PARTITION BY TICKER, EVENT_TIME RANGE INTERVAL '1' MINUTE);
      ```

   1. [**Save and run SQL**] を選択します。

      [**リアルタイム分析**] タブに、アプリケーションで作成されたすべてのアプリケーション内ストリームが表示され、データを検証できます。