创建并运行适用于 Python 的 Apache Flink 应用程序的托管服务 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

创建并运行适用于 Python 的 Apache Flink 应用程序的托管服务

在本节中,您将创建适用于 Python 应用程序的 Apache Flink 托管服务,该应用程序以 Kinesis 流作为源和接收器。

创建依赖资源

在本练习中创建 Managed Service for Apache Flink之前,您需要创建以下从属资源:

  • 两个 Kinesis 流用于输入和输出。

  • 用于存储应用程序代码的 Amazon S3 存储桶。

注意

本教程假设您在 us-east-1 区域部署应用程序。如果您使用其他区域,则必须相应地调整所有步骤。

创建两个 Kinesis 直播

在本练习中创建适用于 Apache Flink 的托管服务应用程序之前,请在要用于部署应用程序的同一区域(本示例中为 us-east-1ExampleOutputStream)中创建两个 Kinesis 数据流(ExampleInputStream和)。您的应用程序将这些数据流用于应用程序源和目标流。

可以使用 Amazon Kinesis 控制台或以下 AWS CLI 命令创建这些流。有关控制台说明,请参阅 Amazon Kinesis Data Streams 开发人员指南中的创建和更新数据流

创建数据流 (AWS CLI)
  1. 要创建第一个直播 (ExampleInputStream),请使用以下 Amazon Kinesis 命令create-stream AWS CLI 。

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1
  2. 要创建应用程序用来写入输出的第二个流,请运行同一命令(将流名称更改为 ExampleOutputStream)。

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1

创建 Amazon S3 存储桶

您可以使用控制台来创建 Amazon S3 存储桶。有关创建该资源的说明,请参阅以下主题:

  • 《Amazon Simple Storage Service 用户指南》中的如何创建 S3 存储桶?。为 Amazon S3 存储桶指定一个全球唯一的名称,例如附加您的登录名。

    注意

    确保在本教程中使用的区域 (us-east-1) 中创建 S3 存储桶。

其他资源

在您创建应用程序时,适用于 Apache Flink 的托管服务会创建以下 Amazon CloudWatch 资源(如果这些资源尚不存在):

  • 名为 /AWS/KinesisAnalytics-java/<my-application> 的日志组。

  • 名为 kinesis-analytics-log-stream 的日志流。

设置本地开发环境

为了进行开发和调试,你可以在你的机器上运行 Python Flink 应用程序。你可以使用你选择的 Python 从命令行启动应用程序,python main.py也可以使用你选择的 Python IDE 启动应用程序。

注意

在你的开发机器上,你必须安装 Python 3.10 或 3.11、Java 11、Apache Maven 和 Git。我们建议您使用IDE诸如PyCharmVisual Studio 代码之类的代码。要验证您是否满足所有先决条件,请在继续操作满足完成练习的先决条件之前参阅。

要开发您的应用程序并在本地运行它,您必须安装 Flink Python 库。

  1. 使用 VirtualEnv、Conda 或任何类似的 Python 工具创建独立的 Python 环境。

  2. 在该环境中安装 PyFlink 库。使用与 Apache Flink 的亚马逊托管服务中使用的相同 Apache Flink 运行时版本。当前,建议的运行时间为 1.19.1。

    $ pip install apache-flink==1.19.1
  3. 运行应用程序时,请确保环境处于活动状态。如果您在中运行应用程序IDE,请确保使用环境作为运行时。IDE该过程取决于您IDE正在使用的。

    注意

    您只需要安装 PyFlink 库即可。您无需在计算机上安装 Apache Flink 集群。

对您的 AWS 会话进行身份验证

该应用程序使用 Kinesis 数据流来发布数据。在本地运行时,您必须拥有有效的 AWS 经过身份验证的会话,并具有写入 Kinesis 数据流的权限。使用以下步骤对您的会话进行身份验证:

  1. 如果您没有配置带有有效凭据 AWS CLI 的命名配置文件,请参阅设置 AWS Command Line Interface (AWS CLI)

  2. 通过发布以下测试记录,验证您的配置 AWS CLI 是否正确,并且您的用户有权写入 Kinesis 数据流:

    $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
  3. 如果你IDE有插件要集成 AWS,你可以用它来将证书传递给在中运行的应用程序IDE。有关更多信息,请参见适用于 Visual Studio 的AWS 工具AWS 包 PyCharm、适用于 Visual Studio 代码AWS 工具包和适用于 Intelli IDEA J 的工具包

下载并查看 Apache Flink 流式传输 Python 代码

此示例的 Python 应用程序代码可从中获得 GitHub。要下载应用程序代码,请执行以下操作:

  1. 使用以下命令克隆远程存储库:

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. 导航到 ./python/GettingStarted 目录。

查看应用程序组件

应用程序代码位于main.py。我们使用 Python 中的SQL嵌入式来定义应用程序的流程。

注意

为了优化开发者体验,该应用程序设计为无需更改任何代码即可在适用于 Apache Flink 的亚马逊托管服务上运行,也可以在本地运行,以便在您的计算机上进行开发。应用程序使用环境变量IS_LOCAL = true来检测何时在本地运行。您必须在 shell 上IS_LOCAL = true或在的运行配置中设置环境变量IDE。

  • 应用程序设置执行环境并读取运行时配置。要在适用于 Apache Flink 的亚马逊托管服务上和本地运行,应用程序会检查变量。IS_LOCAL

    • 以下是在适用于 Apache Flink 的亚马逊托管服务中运行应用程序时的默认行为:

      1. 加载随应用程序打包的依赖关系。有关更多信息,请参阅(链接)

      2. 从您在适用于 Apache 的亚马逊托管服务 Flink 应用程序中定义的运行时属性加载配置。有关更多信息,请参阅(链接)

    • 当应用程序检测到你IS_LOCAL = true何时在本地运行应用程序时:

      1. 从项目加载外部依赖关系。

      2. 从项目中包含application_properties.json的文件加载配置。

        ... APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" ... is_local = ( True if os.environ.get("IS_LOCAL") else False ) ... if is_local: APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) table_env.get_config().get_configuration().set_string( "pipeline.jars", "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar", )
  • 该应用程序使用 Kinesis 连接器定义带有CREATE TABLE语句的源表。此表从输入 Kinesis 流中读取数据。应用程序从运行时配置中获取流的名称、区域和初始位置。

    table_env.execute_sql(f""" CREATE TABLE prices ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{input_stream_name}', 'aws.region' = '{input_stream_region}', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """)
  • 在此示例中,应用程序还使用 Kinesis 连接器定义了一个接收表。这个故事将数据发送到输出的 Kinesis 流。

    table_env.execute_sql(f""" CREATE TABLE output ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3) ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{output_stream_name}', 'aws.region' = '{output_stream_region}', 'sink.partitioner-field-delimiter' = ';', 'sink.batch.max-size' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )""")
  • 最后,应用程序从源表中执行一个 t SQL ha INSERT INTO... t the sink 表。在更复杂的应用程序中,在写入接收器之前,您可能还有其他步骤来转换数据。

    table_result = table_env.execute_sql("""INSERT INTO output SELECT ticker, price, event_time FROM prices""")
  • 您必须在main()函数末尾再添加一个步骤才能在本地运行应用程序:

    if is_local: table_result.wait()

    如果不使用此语句,则当您在本地运行应用程序时,它会立即终止。在适用于 Apache Flink 的亚马逊托管服务中运行应用程序时,不得执行此语句。

管理JAR依赖关系

PyFlink 应用程序通常需要一个或多个连接器。本教程中的应用程序使用 Kinesis 连接器。由于 Apache Flink 在 Java 中运行JVM,因此无论您是否使用 Python 实现应用程序,连接器都会以JAR文件形式分发。在适用于 Apache Flink 的亚马逊托管服务上部署应用程序时,必须将这些依赖项与应用程序打包。

在此示例中,我们展示了如何使用 Apache Maven 获取依赖项并打包应用程序以在 Apache Flink 的托管服务上运行。

注意

还有其他方法可以获取和打包依赖关系。此示例演示了一种适用于一个或多个连接器的方法。它还允许您在本地运行应用程序以进行开发,也可以在适用于 Apache Flink 的托管服务上运行应用程序,而无需更改代码。

使用 pom.xml 文件

Apache Maven 使用该pom.xml文件来控制依赖关系和应用程序打包。

任何JAR依赖关系都是在<dependencies>...</dependencies>块中的pom.xml文件中指定的。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> ... <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>4.3.0-1.19</version> </dependency> </dependencies> ...

要查找要使用的正确构件和连接器版本,请参阅将 Apache Flink 连接器与托管服务一起使用 Apache Flink。请务必参考你正在使用的 Apache Flink 版本。在本示例中,我们使用 Kinesis 连接器。对于 Apache Flink 1.19,连接器版本为。4.3.0-1.19

注意

如果您使用的是 Apache Flink 1.19,则没有专门为此版本发布的连接器版本。使用 1.18 版本发布的连接器。

下载和打包依赖关系

使用 Maven 下载pom.xml文件中定义的依赖关系,然后将其打包给 Python Flink 应用程序。

  1. 导航到包含名为的 Python 入门项目的目录python/GettingStarted

  2. 运行以下命令:

$ mvn package

Maven 创建了一个名./target/pyflink-dependencies.jar为的新文件。当您在计算机上进行本地开发时,Python 应用程序会查找此文件。

注意

如果您忘记运行此命令,则在尝试运行应用程序时,它将失败并显示错误:找不到标识符 “kinesis” 的任何工厂

将示例记录写入输入流

在本节中,您将向流发送示例记录以供应用程序处理。您可以通过两种方式生成示例数据,要么使用 Python 脚本,要么使用 Kinesis 数据生成器。

使用 Python 脚本生成示例数据

您可以使用 Python 脚本将示例记录发送到数据流。

注意

要运行这个 Python 脚本,你必须使用 Python 3.x 并安装AWS SDK适用于 Python(Boto)的库。

要开始向 Kinesis 输入流发送测试数据,请执行以下操作:

  1. 从数据生成器 GitHub 存储库下载数据生成器 stock.py Python 脚本。

  2. 运行 stock.py 脚本:

    $ python stock.py

在完成本教程的其余部分的同时,请保持脚本运行。现在你可以运行你的 Apache Flink 应用程序了。

使用 Kinesis 数据生成器生成示例数据

除了使用 Python 脚本之外,您还可以使用 Kinesis 数据生成器(也在托管版本中提供)将随机样本数据发送到流中。Kinesis 数据生成器可在您的浏览器中运行,您无需在计算机上安装任何东西。

要设置和运行 Kinesis 数据生成器,请执行以下操作:

  1. 按照 Kinesis 数据生成器文档中的说明设置该工具的访问权限。您将运行一个用于设置用户和密码的 AWS CloudFormation 模板。

  2. 通过模板生成的数据生成器访问 Kinesis 数据URL生成器。 CloudFormation CloudFormation 模板完成后,您可以URL在 “输出” 选项卡中找到。

  3. 配置数据生成器:

    • 区域:选择您在本教程中使用的区域:us-east-1

    • 直播/传送流:选择应用程序将使用的输入流:ExampleInputStream

    • 每秒记录数:100

    • 录制模板:复制并粘贴以下模板:

      { "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
  4. 测试模板:选择测试模板并验证生成的记录是否与以下内容类似:

    { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
  5. 启动数据生成器:选择 “选择发送数据”。

Kinesis 数据生成器现在正在将数据发送到。ExampleInputStream

在本地运行应用程序

您可以在本地测试应用程序,使用命令行运行python main.py或从您的命令行运行IDE。

要在本地运行应用程序,必须安装正确版本的 PyFlink库,如上一节所述。有关更多信息,请参阅(链接)

注意

在继续操作之前,请验证输入和输出流是否可用。请参阅 创建两个 Amazon Kinesis 数据流。此外,请确认您有权从两个流中读取和写入数据。请参阅 对您的 AWS 会话进行身份验证

将 Python 项目导入到你的 IDE

要开始在中处理应用程序IDE,必须将其作为 Python 项目导入。

您克隆的存储库包含多个示例。每个示例都是一个单独的项目。在本教程中,请将./python/GettingStarted子目录中的内容导入IDE到。

将代码作为现有 Python 项目导入。

注意

导入新 Python 项目的确切过程因IDE您使用的项目而异。

检查本地应用程序配置

在本地运行时,应用程序使用下项目资源文件夹中application_properties.json文件中的配置./src/main/resources。您可以编辑此文件以使用不同的 Kinesis 直播名称或区域。

[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]

在本地运行你的 Python 应用程序

你可以在本地运行应用程序,可以从命令行作为常规 Python 脚本运行,也可以从中运行IDE。

从命令行运行应用程序
  1. 确保独立 Python 环境(例如 Conda 或你安装了 Python Flink 库 VirtualEnv 的地方)当前处于活动状态。

  2. 确保你mvn package至少跑过一次。

  3. 设置 IS_LOCAL = true 环境变量:

    $ export IS_LOCAL=true
  4. 将该应用程序作为常规 Python 脚本运行。

    $python main.py
要从内部运行应用程序 IDE
  1. 将您的配置IDE为使用以下配置运行main.py脚本:

    1. 使用独立的 Python 环境,例如 Conda 或你安装 PyFlink 库 VirtualEnv 的地方。

    2. 使用 AWS 凭据访问输入和输出 Kinesis 数据流。

    3. 设置 IS_LOCAL = true

  2. 设置运行配置的确切过程取决于您的,IDE并且各不相同。

  3. 设置完毕后IDE,运行 Python 脚本,并在应用程序运行IDE时使用您提供的工具。

在本地检查应用程序日志

在本地运行时,除了在应用程序启动时打印和显示的几行之外,应用程序不会在控制台中显示任何日志。 PyFlink 将日志写入安装了 Python Flink 库的目录中的一个文件中。应用程序启动时会打印日志的位置。您也可以运行以下命令来查找日志:

$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
  1. 列出日志目录中的文件。你通常会找到一个.log文件。

  2. 在应用程序运行时追踪文件:tail -f <log-path>/<log-file>.log.

观察 Kinesis 流中的输入和输出数据

您可以使用 Amazon Kinesis 控制台中的数据查看器观察(生成示例 Python)或 Kinesis 数据生成器(链接)发送到输入流的记录。

要观察记录,请执行以下操作:

停止应用程序在本地运行

停止应用程序在您的中运行IDE。IDE通常提供 “停止” 选项。确切的位置和方法取决于IDE.

Package 你的应用程序代码

在本节中,您将使用 Apache Maven 将应用程序代码和所有必需的依赖项打包到一个.zip 文件中。

再次运行 Maven 软件包命令:

$ mvn package

此命令生成文件target/managed-flink-pyflink-getting-started-1.0.0.zip

将应用程序包上传到 Amazon S3 存储桶

在本节中,您将您在上一节中创建的.zip 文件上传到您在本教程开头创建的亚马逊简单存储服务 (Amazon S3) 存储桶。如果您尚未完成此步骤,请参阅(链接)。

上传应用程序代码JAR文件
  1. 打开 Amazon S3 控制台,网址为https://console.aws.amazon.com/s3/

  2. 选择您之前为应用程序代码创建的存储桶。

  3. 选择上传

  4. 选择 Add files

  5. 导航到上一步中生成的.zip 文件:target/managed-flink-pyflink-getting-started-1.0.0.zip

  6. 在不更改任何其他设置的情况下选择 “上传”。

创建和配置适用于 Apache Flink 的托管服务 Flink 应用程序

您可以使用控制台或 Apache Flink 应用程序创建和配置托管服务。 AWS CLI在本教程中,我们将使用控制台。

创建应用程序

  1. 在 /flink 上打开适用于 Apache Flink 的托管服务控制台 https://console.aws.amazon.com

  2. 确认选择了正确的区域:美国东部(弗吉尼亚北部)us-east-1。

  3. 打开右侧菜单,选择 Apache Flink 应用程序,然后选择 “创建流媒体应用程序”。或者,从初始页面的 “入门” 部分中选择 “创建流媒体应用程序”。

  4. 在 “创建流媒体应用程序” 页面上:

    • 在 “选择一种设置流处理应用程序的方法” 中,选择 “从头开始创建”。

    • 对于 Apache Flink 配置,即应用程序 Flink 版本,请选择 Ap ache Flink 1.19。

    • 对于应用程序配置

      • 对于 应用程序名称 ,输入 MyApplication

      • 对于描述,输入 My Python test app

      • 访问应用程序资源中,选择使用所需策略创建/更新IAM角色 kinesis-analytics-analytics--us-MyApplication east-1

    • 对于应用程序模板设置

      • 对于 “模板”,选择 “开发”。

    • 选择 “创建流媒体应用程序”。

注意

使用控制台为 Apache Flink 应用程序创建托管服务时,您可以选择为应用程序创建IAM角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些IAM资源使用您的应用程序名称和区域命名,如下所示:

  • 策略:kinesis-analytics-service-MyApplication-us-west-2

  • 角色:kinesisanalytics-MyApplication-us-west-2

适用于 Apache Flink 的亚马逊托管服务以前被称为 Kinesis Data Analytics。kinesis-analytics为了向后兼容,自动生成的资源名称带有前缀。

编辑IAM政策

编辑IAM策略以添加访问 Amazon S3 存储桶的权限。

编辑IAM策略以添加 S3 存储桶权限
  1. 打开IAM控制台,网址为https://console.aws.amazon.com/iam/

  2. 选择策略。选择控制台在上一部分中为您创建的 kinesis-analytics-service-MyApplication-us-east-1 策略。

  3. 选择 “编辑”,然后选择该JSON选项卡。

  4. 将以下策略示例中突出显示的部分添加到策略中。替换示例账户 IDs (012345678901) 使用您的账户 ID。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" } ] }
  5. 选择下一步,然后选择保存更改

配置应用程序

编辑应用程序配置以设置应用程序代码构件。

配置应用程序
  1. MyApplication页面上,选择配置

  2. 在 “应用程序代码位置” 部分:

    • 对于 Amazon S3 存储桶,请选择您之前为应用程序代码创建的存储桶。选择 “浏览” 并选择正确的存储桶,然后选择 “选择”。不要在存储桶名称上选择。

    • 在 Amazon S3 对象的路径中,输入managed-flink-pyflink-getting-started-1.0.0.zip

  3. 对于访问权限,请选择kinesis-analytics-MyApplication-us-east-1使用所需策略创建/更新IAM角色

  4. 移至 “运行时属性”,并保留所有其他设置的默认值。

  5. 选择 “添加新项目”,然后添加以下每个参数:

    组 ID
    InputStream0 stream.name ExampleInputStream
    InputStream0 flink.stream.initpos LATEST
    InputStream0 aws.region us-east-1
    OutputStream0 stream.name ExampleOutputStream
    OutputStream0 aws.region us-east-1
    kinesis.analytics.flink.run.options python main.py
    kinesis.analytics.flink.run.options jarfile lib/pyflink-dependencies.jar
  6. 请勿修改任何其他部分,然后选择 “保存更改”。

注意

当您选择启用 Amazon CloudWatch 日志时,适用于 Apache Flink 的托管服务会为您创建一个日志组和日志流。这些资源的名称如下所示:

  • 日志组:/aws/kinesis-analytics/MyApplication

  • 日志流:kinesis-analytics-log-stream

运行应用程序

应用程序现已配置完毕,可以运行了。

运行应用程序
  1. 在适用于 Apache Flink 的亚马逊托管服务的控制台上,选择 “我的应用程序”,然后选择 “运行”。

  2. 在下一页的应用程序还原配置页面上,选择使用最新快照运行,然后选择运行

    应用程序状态” 详细信息会从Ready到,Starting然后转换到应用程序启动Running时。

当应用程序处于Running状态时,您现在可以打开 Flink 控制面板。

打开 控制面板
  1. 选择 “打开 Apache Flink 控制面板”。仪表板将在新页面上打开。

  2. 在 “正在运行的作业” 列表中,选择您可以看到的单个作业。

    注意

    如果您设置了 Runtime 属性或对IAM策略的编辑不正确,则应用程序状态可能会变为Running,但是 Flink 控制面板显示作业正在持续重启。如果应用程序配置错误或缺乏访问外部资源的权限,则通常会出现这种故障。

    发生这种情况时,请查看 Flink 控制面板中的 “异常” 选项卡以查看问题的原因。

观察正在运行的应用程序的指标

在该MyApplication页面的 Amazon CloudWatch 指标部分,您可以看到正在运行的应用程序中的一些基本指标。

查看指标
  1. 在 “刷新” 按钮旁边,从下拉列表中选择 10 秒

  2. 当应用程序运行且运行正常时,您可以看到正常运行时间指标不断增加。

  3. 完全重启指标应为零。如果它增加,则配置可能会出现问题。要调查问题,请查看 Flink 控制面板上的 “异常” 选项卡。

  4. 在运行良好的应用程序中,失败的检查点数指标应为零。

    注意

    此仪表板显示一组固定的指标,粒度为 5 分钟。您可以使用仪表板中的任何指标创建自定义应用程序 CloudWatch 控制面板。

观察 Kinesis 直播中的输出数据

确保您仍在使用 Python 脚本或 Kinesis 数据生成器将数据发布到输入中。

现在,您可以使用中的数据查看器来观察在 Apache Flink 托管服务上运行的应用程序的输出 https://console.aws.amazon.com/kinesis/,就像之前所做的那样。

查看输出
  1. 在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com

  2. 确认该区域与您运行本教程时使用的区域相同。默认情况下,它是 US-East-1US 东部(弗吉尼亚北部)。如有必要,请更改区域。

  3. 选择数据流

  4. 选择要观看的直播。在本教程中,请使用 ExampleOutputStream

  5. 选择数据查看器选项卡。

  6. 选择任意碎片,保持 “最新” 作为起始位置,然后选择 “获取记录”。您可能会看到 “未找到该请求的记录” 错误。如果是,请选择 “重试获取记录”。发布到直播显示屏的最新记录。

  7. 在 “数据” 列中选择值以检查记录的JSON格式内容。

停止应用程序

要停止应用程序,请转到名为的 Apache Flink 托管服务应用程序的控制台页面。MyApplication

停止应用程序
  1. 从 “操作” 下拉列表中,选择 “停止”。

  2. 应用程序详细信息中的状态Running变为Stopping,然后转换到应用程序完全停止Ready时。

    注意

    别忘了停止从 Python 脚本或 Kinesis 数据生成器向输入流发送数据。

后续步骤

清理 AWS 资源