经过仔细考虑,我们决定分两个步骤停止使用亚马逊 Kinesis Data Analytics SQL 的应用程序:
1. 从 2025 年 10 月 15 日起,您将无法为应用程序创建新的 Kinesis Data Analytic SQL s。
2. 我们将从 2026 年 1 月 27 日起删除您的申请。您将无法启动或操作适用于应用程序的 Amazon Kinesis Data Analytic SQL s。从那时起,亚马逊 Kinesis Data Analytics SQL 将不再提供支持。有关更多信息,请参阅 适用于应用程序的 Amazon Kinesis Data Analytic SQL s 停产。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
步骤 1:准备数据
在为此示例创建 Amazon Kinesis Data Analytics 应用程序前,需要先创建一个 Kinesis 数据流,以作为应用程序的流式传输源。另外,您还需运行 Python 代码来将模拟血压数据写入流中。
步骤 1.1:创建 Kinesis 数据流
在此部分中,您创建一个名为 ExampleInputStream
的 Kinesis 数据流。您可以使用 AWS Management Console或 AWS CLI 创建该数据流。
-
使用控制台:
登录到 AWS Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home
。 -
在导航窗格中,选择 数据流。然后选择创建 Kinesis 流。
-
对于名称,请键入
ExampleInputStream
。对于分片数,请键入1
。
-
或者,要使用 AWS CLI 创建数据流,请运行以下命令:
$ aws kinesis create-stream --stream-name ExampleInputStream --shard-count 1
步骤 1.2:将示例记录写入输入流
在此步骤中,您运行 Python 代码以不断生成示例记录并将其写入您创建的数据流中。
-
安装 Python 和 pip。
有关安装 Python 的信息,请参阅 Python
。 您可以使用 pip 安装依赖项。有关安装 pip 的信息,请参阅 pip 文档中的安装
。 -
运行以下 Python 代码。可以将区域改为您要用于此示例的区域。代码中的
put-record
命令将 JSON 记录写入到流。from enum import Enum import json import random import boto3 STREAM_NAME = "ExampleInputStream" class PressureType(Enum): low = "LOW" normal = "NORMAL" high = "HIGH" def get_blood_pressure(pressure_type): pressure = {"BloodPressureLevel": pressure_type.value} if pressure_type == PressureType.low: pressure["Systolic"] = random.randint(50, 80) pressure["Diastolic"] = random.randint(30, 50) elif pressure_type == PressureType.normal: pressure["Systolic"] = random.randint(90, 120) pressure["Diastolic"] = random.randint(60, 80) elif pressure_type == PressureType.high: pressure["Systolic"] = random.randint(130, 200) pressure["Diastolic"] = random.randint(90, 150) else: raise TypeError return pressure def generate(stream_name, kinesis_client): while True: rnd = random.random() pressure_type = ( PressureType.low if rnd < 0.005 else PressureType.high if rnd > 0.995 else PressureType.normal ) blood_pressure = get_blood_pressure(pressure_type) print(blood_pressure) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(blood_pressure), PartitionKey="partitionkey", ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))