在仔細考慮之後,我們決定在兩個步驟中停止 Amazon Kinesis Data Analytics for SQL 應用程式:
1. 從 2025 年 10 月 15 日起,您將無法為SQL應用程式建立新的 Kinesis Data Analytics。
2. 我們將從 2026 年 1 月 27 日起刪除您的應用程式。您將無法啟動或操作SQL應用程式的 Amazon Kinesis Data Analytics。從那時SQL起,Amazon Kinesis Data Analytics 將不再提供 的支援。如需詳細資訊,請參閱Amazon Kinesis Data Analytics for SQL 應用程式終止。
本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
步驟 1:建立輸入和輸出串流
為熱點範例建立 Amazon Kinesis Data Analytics 應用程式之前,您必須建立兩個 Kinesis 資料串流。將其中一個串流設定為應用程式的串流來源,另一個串流設定為 Kinesis Data Analytics 保留應用程式輸出的目的地。
步驟 1.1:建立 Kinesis 資料串流
在本節中,建立兩個 Kinesis 資料串流:ExampleInputStream
和 ExampleOutputStream
。
使用主控台或 AWS CLI 建立資料串流。
-
如要使用主控台建立資料串流:
前往 https://console.aws.amazon.com/kinesis/
登入 AWS Management Console 並開啟 Kinesis 主控台。 -
在導覽窗格中選擇 資料串流。
-
選擇建立 Kinesis 串流,然後建立內含一個名為
ExampleInputStream
的碎片之串流。 -
重複上一個步驟,以名為
ExampleOutputStream
的碎片建立串流。
-
使用 AWS CLI 建立資料串流:
-
使用下列 Kinesis
create-stream
AWS CLI 命令建立串流 (ExampleInputStream
和ExampleOutputStream
)。若要建立應用程式用來寫入輸出的第二個串流,請執行相同的命令,並將串流名稱變更為ExampleOutputStream
。$ aws kinesis create-stream \ --stream-name
ExampleInputStream
\ --shard-count 1 \ --region us-west-2 \ --profile adminuser $ aws kinesis create-stream \ --stream-nameExampleOutputStream
\ --shard-count 1 \ --region us-west-2 \ --profile adminuser
-
步驟 1.2:將範例記錄寫入輸入串流
在此步驟中,執行 Python 程式碼以持續產生範例記錄,並將這些記錄寫入 ExampleInputStream
串流。
{"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"} {"x": 0.722248626580026, "y": 4.648868803193405, "is_hot": "Y"}
-
安裝 Python 與
pip
。如需安裝 Python 的相關資訊,請參閱 Python
網站。 您可以使用 Pip 安裝相依性。如需安裝 Pip 的詳細資訊,請參閱 pip 網站的安裝
。 -
執行以下 Python 程式碼。該程式碼會執行下列作業:
-
在 (X, Y) 平面中的某個位置生成潛在熱點。
-
為每個熱點產生一組 1,000 個點。在這些點中,20% 會叢集在熱點周圍。其餘部分會在整個空間內隨機產生。
-
put-record
命令會將 JSON 記錄寫入串流。
重要
請勿將此檔案上傳到 Web 伺服器,因為它包含您的 AWS 憑證。
import json from pprint import pprint import random import time import boto3 STREAM_NAME = "ExampleInputStream" def get_hotspot(field, spot_size): hotspot = { "left": field["left"] + random.random() * (field["width"] - spot_size), "width": spot_size, "top": field["top"] + random.random() * (field["height"] - spot_size), "height": spot_size, } return hotspot def get_record(field, hotspot, hotspot_weight): rectangle = hotspot if random.random() < hotspot_weight else field point = { "x": rectangle["left"] + random.random() * rectangle["width"], "y": rectangle["top"] + random.random() * rectangle["height"], "is_hot": "Y" if rectangle is hotspot else "N", } return {"Data": json.dumps(point), "PartitionKey": "partition_key"} def generate( stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client ): """ Generates points used as input to a hotspot detection algorithm. With probability hotspot_weight (20%), a point is drawn from the hotspot; otherwise, it is drawn from the base field. The location of the hotspot changes for every 1000 points generated. """ points_generated = 0 hotspot = None while True: if points_generated % 1000 == 0: hotspot = get_hotspot(field, hotspot_size) records = [ get_record(field, hotspot, hotspot_weight) for _ in range(batch_size) ] points_generated += len(records) pprint(records) kinesis_client.put_records(StreamName=stream_name, Records=records) time.sleep(0.1) if __name__ == "__main__": generate( stream_name=STREAM_NAME, field={"left": 0, "width": 10, "top": 0, "height": 10}, hotspot_size=1, hotspot_weight=0.2, batch_size=10, kinesis_client=boto3.client("kinesis"), )
-
後續步驟
步驟 2:建立 Amazon Kinesis Data Analytics 應用程式