慎重に検討した結果、アプリケーションの Amazon Kinesis Data Analytics は 2 つのステップSQLで中止することにしました。
1. 2025 年 10 月 15 日以降、SQLアプリケーションの新しい Kinesis Data Analytics を作成することはできません。
2. 2026 年 1 月 27 日以降、アプリケーションを削除します。SQL アプリケーションの Amazon Kinesis Data Analytics を開始または操作することはできません。SQL それ以降、Amazon Kinesis Data Analytics のサポートは終了します。詳細については、「Amazon Kinesis Data Analytics for SQL Applications の中止」を参照してください。
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
ステップ 1: 入力ストリームと出力ストリームを作成する
ホットスポット例用の Amazon Kinesis Data Analytics アプリケーションを作成する前に、2 つの Kinesis データストリームを作成する必要があります。ストリームの 1 つはアプリケーションのストリーミングソースとして設定し、もう 1 つのストリームは Kinesis Data Analytics がアプリケーション出力を永続化する宛先として設定します。
ステップ 1.1: Kinesis データストリームを作成する
このセクションでは、2 つの Kinesis データストリーム (ExampleInputStream
および ExampleOutputStream
) を作成します。
コンソールまたは AWS CLI を使用してこれらのデータストリームを作成します。
-
コンソールを使用してデータストリームを作成するには
AWS Management Console にサインインし、Kinesis コンソール (https://console.aws.amazon.com/kinesis
) を開きます。 -
ナビゲーションペインで、[データストリーム] を選択します。
-
[Kinesis ストリームの作成] を選択し、
ExampleInputStream
という名前の 1 つのシャードを持つストリームを作成します。 -
前のステップを繰り返し、
ExampleOutputStream
という名前の 1 つのシャードを持つストリームを作成します。
-
AWS CLI を使用してデータストリームを作成するには
-
次の Kinesis
create-stream
AWS CLI コマンドを使用して、ストリーム (ExampleInputStream
およびExampleOutputStream
) を作成します。アプリケーションが出力の書き込みに使用する 2 つめのストリームを作成するには、ストリーム名をExampleOutputStream
に変更して同じコマンドを実行します。$ aws kinesis create-stream \ --stream-name
ExampleInputStream
\ --shard-count 1 \ --region us-west-2 \ --profile adminuser $ aws kinesis create-stream \ --stream-nameExampleOutputStream
\ --shard-count 1 \ --region us-west-2 \ --profile adminuser
-
ステップ 1.2: 入力ストリームにサンプルレコードを書き込みます
このステップでは、Python コードを実行してサンプルレコードを連続生成し、ExampleInputStream
ストリームに書き込みます。
{"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"} {"x": 0.722248626580026, "y": 4.648868803193405, "is_hot": "Y"}
-
Python および
pip
をインストールします。Python のインストールについては、Python
ウェブサイトをご覧ください。 pip を使用して依存関係をインストールできます。pip のインストールについては、pip ウェブサイトの「Installation
」を参照してください。 -
以下の Python コードを実行します。このコードは以下の処理を実行します。
-
潜在的なホットスポットを (X、Y) 平面のどこかに生成します。
-
ホットスポットごとに 1000 ポイントのセットを生成します。これらのポイントのうち、20 パーセントがホットスポットの周囲にクラスター化されています。残りはスペース全体でランダムに生成されます。
-
put-record
コマンドは、ストリームに JSON レコードを書き込みます。
重要
このファイルには AWS 認証情報が含まれているため、このファイルをウェブサーバーにアップロードしないでください。
import json from pprint import pprint import random import time import boto3 STREAM_NAME = "ExampleInputStream" def get_hotspot(field, spot_size): hotspot = { "left": field["left"] + random.random() * (field["width"] - spot_size), "width": spot_size, "top": field["top"] + random.random() * (field["height"] - spot_size), "height": spot_size, } return hotspot def get_record(field, hotspot, hotspot_weight): rectangle = hotspot if random.random() < hotspot_weight else field point = { "x": rectangle["left"] + random.random() * rectangle["width"], "y": rectangle["top"] + random.random() * rectangle["height"], "is_hot": "Y" if rectangle is hotspot else "N", } return {"Data": json.dumps(point), "PartitionKey": "partition_key"} def generate( stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client ): """ Generates points used as input to a hotspot detection algorithm. With probability hotspot_weight (20%), a point is drawn from the hotspot; otherwise, it is drawn from the base field. The location of the hotspot changes for every 1000 points generated. """ points_generated = 0 hotspot = None while True: if points_generated % 1000 == 0: hotspot = get_hotspot(field, hotspot_size) records = [ get_record(field, hotspot, hotspot_weight) for _ in range(batch_size) ] points_generated += len(records) pprint(records) kinesis_client.put_records(StreamName=stream_name, Records=records) time.sleep(0.1) if __name__ == "__main__": generate( stream_name=STREAM_NAME, field={"left": 0, "width": 10, "top": 0, "height": 10}, hotspot_size=1, hotspot_weight=0.2, batch_size=10, kinesis_client=boto3.client("kinesis"), )
-
次のステップ
ステップ 2: Kinesis Data Analytics アプリケーションを作成する