Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
创建并运行适用于 Python 的 Apache Flink 应用程序的托管服务
在本节中,您将创建适用于 Python 应用程序的 Apache Flink 托管服务,该应用程序以 Kinesis 流作为源和接收器。
本节包含以下步骤。
创建依赖资源
在本练习中创建 Managed Service for Apache Flink之前,您需要创建以下从属资源:
-
两个 Kinesis 流用于输入和输出。
-
用于存储应用程序代码的 Amazon S3 存储桶。
注意
本教程假设您在 us-east-1 区域部署应用程序。如果您使用其他区域,则必须相应地调整所有步骤。
创建两个 Kinesis 直播
在本练习中创建适用于 Apache Flink 的托管服务应用程序之前,请在要用于部署应用程序的同一区域(本示例中为 us-east-1ExampleOutputStream
)中创建两个 Kinesis 数据流(ExampleInputStream
和)。您的应用程序将这些数据流用于应用程序源和目标流。
可以使用 Amazon Kinesis 控制台或以下 AWS CLI 命令创建这些流。有关控制台说明,请参阅 Amazon Kinesis Data Streams 开发人员指南中的创建和更新数据流。
创建数据流 (AWS CLI)
-
要创建第一个直播 (
ExampleInputStream
),请使用以下 Amazon Kinesis 命令create-stream
AWS CLI 。$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1
-
要创建应用程序用来写入输出的第二个流,请运行同一命令(将流名称更改为
ExampleOutputStream
)。$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1
创建 Amazon S3 存储桶
您可以使用控制台来创建 Amazon S3 存储桶。有关创建该资源的说明,请参阅以下主题:
-
《Amazon Simple Storage Service 用户指南》中的如何创建 S3 存储桶?。为 Amazon S3 存储桶指定一个全球唯一的名称,例如附加您的登录名。
注意
确保在本教程中使用的区域 (us-east-1) 中创建 S3 存储桶。
其他资源
在您创建应用程序时,适用于 Apache Flink 的托管服务会创建以下 Amazon CloudWatch 资源(如果这些资源尚不存在):
-
名为
/AWS/KinesisAnalytics-java/<my-application>
的日志组。 -
名为
kinesis-analytics-log-stream
的日志流。
设置本地开发环境
为了进行开发和调试,你可以在你的机器上运行 Python Flink 应用程序。你可以使用你选择的 Python 从命令行启动应用程序,python
main.py
也可以使用你选择的 Python IDE 启动应用程序。
注意
在你的开发机器上,你必须安装 Python 3.10 或 3.11、Java 11、Apache Maven 和 Git。我们建议您使用IDE诸如PyCharm
安装 PyFlink 库
要开发您的应用程序并在本地运行它,您必须安装 Flink Python 库。
-
使用 VirtualEnv、Conda 或任何类似的 Python 工具创建独立的 Python 环境。
-
在该环境中安装 PyFlink 库。使用与 Apache Flink 的亚马逊托管服务中使用的相同 Apache Flink 运行时版本。当前,建议的运行时间为 1.19.1。
$ pip install apache-flink==1.19.1
-
运行应用程序时,请确保环境处于活动状态。如果您在中运行应用程序IDE,请确保使用环境作为运行时。IDE该过程取决于您IDE正在使用的。
注意
您只需要安装 PyFlink 库即可。您无需在计算机上安装 Apache Flink 集群。
对您的 AWS 会话进行身份验证
该应用程序使用 Kinesis 数据流来发布数据。在本地运行时,您必须拥有有效的 AWS 经过身份验证的会话,并具有写入 Kinesis 数据流的权限。使用以下步骤对您的会话进行身份验证:
-
如果您没有配置带有有效凭据 AWS CLI 的命名配置文件,请参阅设置 AWS Command Line Interface (AWS CLI)。
-
通过发布以下测试记录,验证您的配置 AWS CLI 是否正确,并且您的用户有权写入 Kinesis 数据流:
$ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
-
如果你IDE有插件要集成 AWS,你可以用它来将证书传递给在中运行的应用程序IDE。有关更多信息,请参见适用于 Visual Studio 的AWS 工具AWS 包 PyCharm、适用
于 Visual Studio 代码 的AWS 工具包和适用于 Intelli IDEA J 的工具包 。
下载并查看 Apache Flink 流式传输 Python 代码
此示例的 Python 应用程序代码可从中获得 GitHub。要下载应用程序代码,请执行以下操作:
-
使用以下命令克隆远程存储库:
git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
-
导航到
./python/GettingStarted
目录。
查看应用程序组件
应用程序代码位于main.py
。我们使用 Python 中的SQL嵌入式来定义应用程序的流程。
注意
为了优化开发者体验,该应用程序设计为无需更改任何代码即可在适用于 Apache Flink 的亚马逊托管服务上运行,也可以在本地运行,以便在您的计算机上进行开发。应用程序使用环境变量IS_LOCAL =
true
来检测何时在本地运行。您必须在 shell 上IS_LOCAL = true
或在的运行配置中设置环境变量IDE。
-
应用程序设置执行环境并读取运行时配置。要在适用于 Apache Flink 的亚马逊托管服务上和本地运行,应用程序会检查变量。
IS_LOCAL
-
以下是在适用于 Apache Flink 的亚马逊托管服务中运行应用程序时的默认行为:
-
加载随应用程序打包的依赖关系。有关更多信息,请参阅(链接)
-
从您在适用于 Apache 的亚马逊托管服务 Flink 应用程序中定义的运行时属性加载配置。有关更多信息,请参阅(链接)
-
-
当应用程序检测到你
IS_LOCAL = true
何时在本地运行应用程序时:-
从项目加载外部依赖关系。
-
从项目中包含
application_properties.json
的文件加载配置。... APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" ... is_local = ( True if os.environ.get("IS_LOCAL") else False ) ... if is_local: APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) table_env.get_config().get_configuration().set_string( "pipeline.jars", "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar", )
-
-
-
该应用程序使用 Kinesis
连接器定义带有 CREATE TABLE
语句的源表。此表从输入 Kinesis 流中读取数据。应用程序从运行时配置中获取流的名称、区域和初始位置。table_env.execute_sql(f""" CREATE TABLE prices ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{input_stream_name}', 'aws.region' = '{input_stream_region}', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """)
-
在此示例中,应用程序还使用 Kinesis 连接器
定义了一个接收表。这个故事将数据发送到输出的 Kinesis 流。 table_env.execute_sql(f""" CREATE TABLE output ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3) ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{output_stream_name}', 'aws.region' = '{output_stream_region}', 'sink.partitioner-field-delimiter' = ';', 'sink.batch.max-size' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )""")
-
最后,应用程序从源表中执行一个 t SQL ha
INSERT INTO...
t the sink 表。在更复杂的应用程序中,在写入接收器之前,您可能还有其他步骤来转换数据。table_result = table_env.execute_sql("""INSERT INTO output SELECT ticker, price, event_time FROM prices""")
-
您必须在
main()
函数末尾再添加一个步骤才能在本地运行应用程序:if is_local: table_result.wait()
如果不使用此语句,则当您在本地运行应用程序时,它会立即终止。在适用于 Apache Flink 的亚马逊托管服务中运行应用程序时,不得执行此语句。
管理JAR依赖关系
PyFlink 应用程序通常需要一个或多个连接器。本教程中的应用程序使用 Kinesis 连接器
在此示例中,我们展示了如何使用 Apache Maven 获取依赖项并打包应用程序以在 Apache Flink 的托管服务上运行。
注意
还有其他方法可以获取和打包依赖关系。此示例演示了一种适用于一个或多个连接器的方法。它还允许您在本地运行应用程序以进行开发,也可以在适用于 Apache Flink 的托管服务上运行应用程序,而无需更改代码。
使用 pom.xml 文件
Apache Maven 使用该pom.xml
文件来控制依赖关系和应用程序打包。
任何JAR依赖关系都是在<dependencies>...</dependencies>
块中的pom.xml
文件中指定的。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> ... <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>4.3.0-1.19</version> </dependency> </dependencies> ...
要查找要使用的正确构件和连接器版本,请参阅将 Apache Flink 连接器与托管服务一起使用 Apache Flink。请务必参考你正在使用的 Apache Flink 版本。在本示例中,我们使用 Kinesis 连接器。对于 Apache Flink 1.19,连接器版本为。4.3.0-1.19
注意
如果您使用的是 Apache Flink 1.19,则没有专门为此版本发布的连接器版本。使用 1.18 版本发布的连接器。
下载和打包依赖关系
使用 Maven 下载pom.xml
文件中定义的依赖关系,然后将其打包给 Python Flink 应用程序。
-
导航到包含名为的 Python 入门项目的目录
python/GettingStarted
。 -
运行以下命令:
$ mvn package
Maven 创建了一个名./target/pyflink-dependencies.jar
为的新文件。当您在计算机上进行本地开发时,Python 应用程序会查找此文件。
注意
如果您忘记运行此命令,则在尝试运行应用程序时,它将失败并显示错误:找不到标识符 “kinesis” 的任何工厂。
将示例记录写入输入流
在本节中,您将向流发送示例记录以供应用程序处理。您可以通过两种方式生成示例数据,要么使用 Python 脚本,要么使用 Kinesis 数据
使用 Python 脚本生成示例数据
您可以使用 Python 脚本将示例记录发送到数据流。
注意
要运行这个 Python 脚本,你必须使用 Python 3.x 并安装AWS SDK适用于 Python(Boto)的
要开始向 Kinesis 输入流发送测试数据,请执行以下操作:
-
从数据生成器 GitHub 存储库
下载数据生成器 stock.py
Python 脚本。 -
运行
stock.py
脚本:$ python stock.py
在完成本教程的其余部分的同时,请保持脚本运行。现在你可以运行你的 Apache Flink 应用程序了。
使用 Kinesis 数据生成器生成示例数据
除了使用 Python 脚本之外,您还可以使用 Kinesis 数据生成器
要设置和运行 Kinesis 数据生成器,请执行以下操作:
-
按照 Kinesis 数据生成器文档
中的说明设置该工具的访问权限。您将运行一个用于设置用户和密码的 AWS CloudFormation 模板。 -
通过模板生成的数据生成器访问 Kinesis 数据URL生成器。 CloudFormation CloudFormation 模板完成后,您可以URL在 “输出” 选项卡中找到。
-
配置数据生成器:
-
区域:选择您在本教程中使用的区域:us-east-1
-
直播/传送流:选择应用程序将使用的输入流:
ExampleInputStream
-
每秒记录数:100
-
录制模板:复制并粘贴以下模板:
{ "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
-
-
测试模板:选择测试模板并验证生成的记录是否与以下内容类似:
{ "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
-
启动数据生成器:选择 “选择发送数据”。
Kinesis 数据生成器现在正在将数据发送到。ExampleInputStream
在本地运行应用程序
您可以在本地测试应用程序,使用命令行运行python main.py
或从您的命令行运行IDE。
要在本地运行应用程序,必须安装正确版本的 PyFlink库,如上一节所述。有关更多信息,请参阅(链接)
注意
在继续操作之前,请验证输入和输出流是否可用。请参阅 创建两个 Amazon Kinesis 数据流。此外,请确认您有权从两个流中读取和写入数据。请参阅 对您的 AWS 会话进行身份验证。
将 Python 项目导入到你的 IDE
要开始在中处理应用程序IDE,必须将其作为 Python 项目导入。
您克隆的存储库包含多个示例。每个示例都是一个单独的项目。在本教程中,请将./python/GettingStarted
子目录中的内容导入IDE到。
将代码作为现有 Python 项目导入。
注意
导入新 Python 项目的确切过程因IDE您使用的项目而异。
检查本地应用程序配置
在本地运行时,应用程序使用下项目资源文件夹中application_properties.json
文件中的配置./src/main/resources
。您可以编辑此文件以使用不同的 Kinesis 直播名称或区域。
[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]
在本地运行你的 Python 应用程序
你可以在本地运行应用程序,可以从命令行作为常规 Python 脚本运行,也可以从中运行IDE。
从命令行运行应用程序
-
确保独立 Python 环境(例如 Conda 或你安装了 Python Flink 库 VirtualEnv 的地方)当前处于活动状态。
-
确保你
mvn package
至少跑过一次。 -
设置
IS_LOCAL = true
环境变量:$ export IS_LOCAL=true
-
将该应用程序作为常规 Python 脚本运行。
$python main.py
要从内部运行应用程序 IDE
-
将您的配置IDE为使用以下配置运行
main.py
脚本:-
使用独立的 Python 环境,例如 Conda 或你安装 PyFlink 库 VirtualEnv 的地方。
-
使用 AWS 凭据访问输入和输出 Kinesis 数据流。
-
设置
IS_LOCAL = true
。
-
-
设置运行配置的确切过程取决于您的,IDE并且各不相同。
-
设置完毕后IDE,运行 Python 脚本,并在应用程序运行IDE时使用您提供的工具。
在本地检查应用程序日志
在本地运行时,除了在应用程序启动时打印和显示的几行之外,应用程序不会在控制台中显示任何日志。 PyFlink 将日志写入安装了 Python Flink 库的目录中的一个文件中。应用程序启动时会打印日志的位置。您也可以运行以下命令来查找日志:
$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
-
列出日志目录中的文件。你通常会找到一个
.log
文件。 -
在应用程序运行时追踪文件:
tail -f <log-path>/<log-file>.log
.
观察 Kinesis 流中的输入和输出数据
您可以使用 Amazon Kinesis 控制台中的数据查看器观察(生成示例 Python)或 Kinesis 数据生成器(链接)发送到输入流的记录。
要观察记录,请执行以下操作:
停止应用程序在本地运行
停止应用程序在您的中运行IDE。IDE通常提供 “停止” 选项。确切的位置和方法取决于IDE.
Package 你的应用程序代码
在本节中,您将使用 Apache Maven 将应用程序代码和所有必需的依赖项打包到一个.zip 文件中。
再次运行 Maven 软件包命令:
$ mvn package
此命令生成文件target/managed-flink-pyflink-getting-started-1.0.0.zip
。
将应用程序包上传到 Amazon S3 存储桶
在本节中,您将您在上一节中创建的.zip 文件上传到您在本教程开头创建的亚马逊简单存储服务 (Amazon S3) 存储桶。如果您尚未完成此步骤,请参阅(链接)。
上传应用程序代码JAR文件
打开 Amazon S3 控制台,网址为https://console.aws.amazon.com/s3/
。 -
选择您之前为应用程序代码创建的存储桶。
-
选择上传。
-
选择 Add files。
-
导航到上一步中生成的.zip 文件:
target/managed-flink-pyflink-getting-started-1.0.0.zip
。 -
在不更改任何其他设置的情况下选择 “上传”。
创建和配置适用于 Apache Flink 的托管服务 Flink 应用程序
您可以使用控制台或 Apache Flink 应用程序创建和配置托管服务。 AWS CLI在本教程中,我们将使用控制台。
创建应用程序
在 /flink 上打开适用于 Apache Flink 的托管服务控制台 https://console.aws.amazon.com
-
确认选择了正确的区域:美国东部(弗吉尼亚北部)us-east-1。
-
打开右侧菜单,选择 Apache Flink 应用程序,然后选择 “创建流媒体应用程序”。或者,从初始页面的 “入门” 部分中选择 “创建流媒体应用程序”。
-
在 “创建流媒体应用程序” 页面上:
-
在 “选择一种设置流处理应用程序的方法” 中,选择 “从头开始创建”。
-
对于 Apache Flink 配置,即应用程序 Flink 版本,请选择 Ap ache Flink 1.19。
-
对于应用程序配置:
-
对于 应用程序名称 ,输入
MyApplication
。 -
对于描述,输入
My Python test app
。 -
在访问应用程序资源中,选择使用所需策略创建/更新IAM角色 kinesis-analytics-analytics--us-MyApplication east-1。
-
-
对于应用程序模板设置:
-
对于 “模板”,选择 “开发”。
-
-
选择 “创建流媒体应用程序”。
-
注意
使用控制台为 Apache Flink 应用程序创建托管服务时,您可以选择为应用程序创建IAM角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些IAM资源使用您的应用程序名称和区域命名,如下所示:
-
策略:
kinesis-analytics-service-
MyApplication
-us-west-2
-
角色:
kinesisanalytics-
MyApplication
-us-west-2
适用于 Apache Flink 的亚马逊托管服务以前被称为 Kinesis Data Analytics。kinesis-analytics
为了向后兼容,自动生成的资源名称带有前缀。
编辑IAM政策
编辑IAM策略以添加访问 Amazon S3 存储桶的权限。
编辑IAM策略以添加 S3 存储桶权限
打开IAM控制台,网址为https://console.aws.amazon.com/iam/
。 -
选择策略。选择控制台在上一部分中为您创建的
kinesis-analytics-service-MyApplication-us-east-1
策略。 -
选择 “编辑”,然后选择该JSON选项卡。
-
将以下策略示例中突出显示的部分添加到策略中。替换示例账户 IDs (
012345678901
) 使用您的账户 ID。{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:
012345678901
:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901
:stream/ExampleOutputStream" } -
选择下一步,然后选择保存更改。
配置应用程序
编辑应用程序配置以设置应用程序代码构件。
配置应用程序
-
在MyApplication页面上,选择配置。
-
在 “应用程序代码位置” 部分:
-
对于 Amazon S3 存储桶,请选择您之前为应用程序代码创建的存储桶。选择 “浏览” 并选择正确的存储桶,然后选择 “选择”。不要在存储桶名称上选择。
-
在 Amazon S3 对象的路径中,输入
managed-flink-pyflink-getting-started-1.0.0.zip
。
-
-
对于访问权限,请选择
kinesis-analytics-MyApplication-us-east-1
使用所需策略创建/更新IAM角色。 -
移至 “运行时属性”,并保留所有其他设置的默认值。
-
选择 “添加新项目”,然后添加以下每个参数:
组 ID 键 值 InputStream0
stream.name
ExampleInputStream
InputStream0
flink.stream.initpos
LATEST
InputStream0
aws.region
us-east-1
OutputStream0
stream.name
ExampleOutputStream
OutputStream0
aws.region
us-east-1
kinesis.analytics.flink.run.options
python
main.py
kinesis.analytics.flink.run.options
jarfile
lib/pyflink-dependencies.jar
-
请勿修改任何其他部分,然后选择 “保存更改”。
注意
当您选择启用 Amazon CloudWatch 日志时,适用于 Apache Flink 的托管服务会为您创建一个日志组和日志流。这些资源的名称如下所示:
-
日志组:
/aws/kinesis-analytics/MyApplication
-
日志流:
kinesis-analytics-log-stream
运行应用程序
应用程序现已配置完毕,可以运行了。
运行应用程序
-
在适用于 Apache Flink 的亚马逊托管服务的控制台上,选择 “我的应用程序”,然后选择 “运行”。
-
在下一页的应用程序还原配置页面上,选择使用最新快照运行,然后选择运行。
“应用程序状态” 详细信息会从
Ready
到,Starting
然后转换到应用程序启动Running
时。
当应用程序处于Running
状态时,您现在可以打开 Flink 控制面板。
打开 控制面板
-
选择 “打开 Apache Flink 控制面板”。仪表板将在新页面上打开。
-
在 “正在运行的作业” 列表中,选择您可以看到的单个作业。
注意
如果您设置了 Runtime 属性或对IAM策略的编辑不正确,则应用程序状态可能会变为
Running
,但是 Flink 控制面板显示作业正在持续重启。如果应用程序配置错误或缺乏访问外部资源的权限,则通常会出现这种故障。发生这种情况时,请查看 Flink 控制面板中的 “异常” 选项卡以查看问题的原因。
观察正在运行的应用程序的指标
在该MyApplication页面的 Amazon CloudWatch 指标部分,您可以看到正在运行的应用程序中的一些基本指标。
查看指标
-
在 “刷新” 按钮旁边,从下拉列表中选择 10 秒。
-
当应用程序运行且运行正常时,您可以看到正常运行时间指标不断增加。
-
完全重启指标应为零。如果它增加,则配置可能会出现问题。要调查问题,请查看 Flink 控制面板上的 “异常” 选项卡。
-
在运行良好的应用程序中,失败的检查点数指标应为零。
注意
此仪表板显示一组固定的指标,粒度为 5 分钟。您可以使用仪表板中的任何指标创建自定义应用程序 CloudWatch 控制面板。
观察 Kinesis 直播中的输出数据
确保您仍在使用 Python 脚本或 Kinesis 数据生成器将数据发布到输入中。
现在,您可以使用中的数据查看器来观察在 Apache Flink 托管服务上运行的应用程序的输出 https://console.aws.amazon.com/kinesis/
查看输出
在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com
-
确认该区域与您运行本教程时使用的区域相同。默认情况下,它是 US-East-1US 东部(弗吉尼亚北部)。如有必要,请更改区域。
-
选择数据流。
-
选择要观看的直播。在本教程中,请使用
ExampleOutputStream
。 -
选择数据查看器选项卡。
-
选择任意碎片,保持 “最新” 作为起始位置,然后选择 “获取记录”。您可能会看到 “未找到该请求的记录” 错误。如果是,请选择 “重试获取记录”。发布到直播显示屏的最新记录。
-
在 “数据” 列中选择值以检查记录的JSON格式内容。
停止应用程序
要停止应用程序,请转到名为的 Apache Flink 托管服务应用程序的控制台页面。MyApplication
停止应用程序
-
从 “操作” 下拉列表中,选择 “停止”。
-
应用程序详细信息中的状态从
Running
变为Stopping
,然后转换到应用程序完全停止Ready
时。注意
别忘了停止从 Python 脚本或 Kinesis 数据生成器向输入流发送数据。