示例:检测流上的热点 (HOTSPOTS 函数) - 适用于 Amazon Kinesis Data Analytics·for·SQL 应用程序开发人员指南

对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用 Kinesis Data Analytics for SQL 应用程序。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

示例:检测流上的热点 (HOTSPOTS 函数)

Amazon Kinesis Data Analytics 提供了 HOTSPOTS 函数,它可以查找并返回有关数据中的相对密集的区域的信息。有关更多信息,请参阅 Amazon Managed Service for Apache Flink SQL 参考中的 HOTSPOTS

在本练习中,您将编写应用程序代码以查找应用程序的流式传输源上的热点。要设置应用程序,请执行以下步骤:

  1. 设置流式传输源 - 您设置 Kinesis 流并编写示例坐标数据,如下所示:

    {"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"} {"x": 0.722248626528026, "y": 4.648868803193405, "is_hot": "Y"}

    本示例提供了用于填充流的 Python 脚本。xy 值是随机生成的,一些记录集中在特定位置周围。

    如果脚本有意生成值作为热点的一部分,is_hot 字段将作为指示器提供。这可以帮助您评估热点检测函数是否正常运行。

  2. 创建应用程序 – 使用 AWS Management Console,您随后创建一个 Kinesis Data Analytics 应用程序。通过将流式传输源映射到应用程序内部流 (SOURCE_SQL_STREAM_001) 来配置应用程序输入。在应用程序启动时,Kinesis Data Analytics 持续读取流式传输源,并将记录插入到应用程序内部流中。

    在本练习中,您将为应用程序使用以下代码:

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "x" DOUBLE, "y" DOUBLE, "is_hot" VARCHAR(4), HOTSPOTS_RESULT VARCHAR(10000) ); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT "x", "y", "is_hot", "HOTSPOTS_RESULT" FROM TABLE ( HOTSPOTS( CURSOR(SELECT STREAM "x", "y", "is_hot" FROM "SOURCE_SQL_STREAM_001"), 1000, 0.2, 17) );

    此代码读取 SOURCE_SQL_STREAM_001 中的行,分析它是否有大量热点,并将生成的数据写入到另一个应用程序内部流 (DESTINATION_SQL_STREAM)。您使用数据泵将流插入到应用程序内部流。有关更多信息,请参阅 应用程序内部流和数据泵

  3. 配置输出 - 您配置应用程序输出以将应用程序中的数据发送到外部目标 (另一个 Kinesis 数据流)。查看热点分数并确定哪些分数表明出现了热点 (并且您需要收到警报)。您可以使用 AWS Lambda 函数进一步处理热点信息并配置警报。

  4. 验证输出 - 示例包含一个 JavaScript 应用程序,该应用程序从输出流读取数据并以图形方式显示它,因此您可以实时查看应用程序生成的热点。

本练习使用美国西部(俄勒冈州)(us-west-2) 区域创建这些流和您的应用程序。如果您使用任何其他区域,请相应地更新代码。