经过仔细考虑,我们决定分两个步骤停止使用亚马逊 Kinesis Data Analytics SQL 的应用程序:
1. 从 2025 年 10 月 15 日起,您将无法为应用程序创建新的 Kinesis Data Analytic SQL s。
2. 我们将从 2026 年 1 月 27 日起删除您的申请。您将无法启动或操作适用于应用程序的 Amazon Kinesis Data Analytic SQL s。从那时起,亚马逊 Kinesis Data Analytics SQL 将不再提供支持。有关更多信息,请参阅 适用于应用程序的 Amazon Kinesis Data Analytic SQL s 停产。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
步骤 1:创建输入和输出流
在为热点示例创建 Amazon Kinesis Data Analytics 应用程序之前,您必须创建两个 Kinesis 数据流。将一个流配置为应用程序的流式传输源,并将另一个流配置为目标(Kinesis Data Analytics 在其中永久保存应用程序输出)。
步骤 1.1:创建 Kinesis 数据流
在此部分中,您创建两个 Kinesis 数据流:ExampleInputStream
和 ExampleOutputStream
。
使用控制台或 AWS CLI 创建这些数据流。
-
使用控制台创建数据流:
登录到 AWS Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home
。 -
在导航窗格中,选择 数据流。
-
选择创建 Kinesis 流,然后创建带有一个名为
ExampleInputStream
的分片的流。 -
重复上一步骤以创建带有一个名为
ExampleOutputStream
的分片的流。
-
要使用 AWS CLI 创建数据流,请执行以下操作:
-
使用以下 Kinesis
create-stream
AWS CLI 命令创建流(ExampleInputStream
和ExampleOutputStream
)。要创建另一个流 (应用程序将用于写入输出),请运行同一命令以将流名称更改为ExampleOutputStream
。$ aws kinesis create-stream \ --stream-name
ExampleInputStream
\ --shard-count 1 \ --region us-west-2 \ --profile adminuser $ aws kinesis create-stream \ --stream-nameExampleOutputStream
\ --shard-count 1 \ --region us-west-2 \ --profile adminuser
-
步骤 1.2:将示例记录写入输入流
在此步骤中,您运行 Python 代码以持续生成示例记录并将其写入 ExampleInputStream
流。
{"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"} {"x": 0.722248626580026, "y": 4.648868803193405, "is_hot": "Y"}
-
安装 Python 和
pip
。有关安装 Python 的信息,请访问 Python
网站。 您可以使用 pip 安装依赖项。有关安装 pip 的信息,请参阅 pip 网站上的安装
。 -
运行以下 Python 代码。此代码将执行以下操作:
-
在 (X, Y) 平面上的某个位置生成潜在热点。
-
为每个热点生成一系列点 (1000 个)。这些点中有 20% 集中在热点周围。其余的点在整个空间内随机生成。
-
put-record
命令将 JSON 记录写入到流。
重要
请勿将此文件上传到 Web 服务器,因为它包含您的 AWS 凭证。
import json from pprint import pprint import random import time import boto3 STREAM_NAME = "ExampleInputStream" def get_hotspot(field, spot_size): hotspot = { "left": field["left"] + random.random() * (field["width"] - spot_size), "width": spot_size, "top": field["top"] + random.random() * (field["height"] - spot_size), "height": spot_size, } return hotspot def get_record(field, hotspot, hotspot_weight): rectangle = hotspot if random.random() < hotspot_weight else field point = { "x": rectangle["left"] + random.random() * rectangle["width"], "y": rectangle["top"] + random.random() * rectangle["height"], "is_hot": "Y" if rectangle is hotspot else "N", } return {"Data": json.dumps(point), "PartitionKey": "partition_key"} def generate( stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client ): """ Generates points used as input to a hotspot detection algorithm. With probability hotspot_weight (20%), a point is drawn from the hotspot; otherwise, it is drawn from the base field. The location of the hotspot changes for every 1000 points generated. """ points_generated = 0 hotspot = None while True: if points_generated % 1000 == 0: hotspot = get_hotspot(field, hotspot_size) records = [ get_record(field, hotspot, hotspot_weight) for _ in range(batch_size) ] points_generated += len(records) pprint(records) kinesis_client.put_records(StreamName=stream_name, Records=records) time.sleep(0.1) if __name__ == "__main__": generate( stream_name=STREAM_NAME, field={"left": 0, "width": 10, "top": 0, "height": 10}, hotspot_size=1, hotspot_weight=0.2, batch_size=10, kinesis_client=boto3.client("kinesis"), )
-
下一个步骤
步骤 2:创建 Kinesis Data Analytics 应用程序