创建并运行适用于 Apache Flink 应用程序的托管服务 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

创建并运行适用于 Apache Flink 应用程序的托管服务

在此步骤中,您将创建一个以 Kinesis 数据流作为源和接收器的适用于 Apache Flink 的托管服务。

创建依赖资源

在本练习中,创建Managed Service for Apache Flink的应用程序之前,您需要创建以下从属资源:

  • 两个 Kinesis 数据流用于输入和输出

  • 用于存储应用程序代码的 Amazon S3 存储桶

    注意

    本教程假设您正在美国东部(弗吉尼亚北部)us-east-1 区域部署应用程序。如果您使用其他区域,请相应地调整所有步骤。

创建两个 Amazon Kinesis 数据流

在为本练习创建 Managed Service for Apache Flink 应用程序之前,请创建两个 Kinesis 数据流(ExampleInputStreamExampleOutputStream)。您的应用程序将这些数据流用于应用程序源和目标流。

您可以使用 Amazon Kinesis 控制台或以下 AWS CLI 命令创建这些直播。有关控制台说明,请参阅 Amazon Kinesis Data Streams 开发人员指南中的创建和更新数据流。要使用创建直播 AWS CLI,请使用以下命令,根据您用于应用程序的区域进行调整。

创建数据流 (AWS CLI)
  1. 要创建第一个直播 (ExampleInputStream),请使用以下 Amazon Kinesis 命令 create-stream AWS CLI :

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \
  2. 要创建应用程序用来写入输出的第二个流,请运行相同的命令,将流名称更改为ExampleOutputStream

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1 \

为应用程序代码创建 Amazon S3 存储桶

您可以使用控制台来创建 Amazon S3 存储桶。要了解如何使用控制台创建 Amazon S3 存储桶,请参阅 Amazon S3 用户指南中的创建存储桶。使用全球唯一名称命名 Amazon S3 存储桶,例如附加您的登录名。

注意

请务必在本教程中使用的区域 (us-east-1) 中创建存储桶。

其他资源

在您创建应用程序时,适用于 Apache Flink 的托管服务会自动创建以下 Amazon CloudWatch 资源(如果这些资源尚不存在):

  • 名为 /AWS/KinesisAnalytics-java/<my-application> 的日志组

  • 名为 kinesis-analytics-log-stream 的日志流

设置本地开发环境

为了进行开发和调试,您可以直接从自己选择的计算机上运行 Apache Flink 应用程序IDE。任何 Apache Flink 依赖关系都像使用 Apache Maven 的常规 Java 依赖项一样处理。

注意

在你的开发计算机上,你必须安装 Java JDK 11、Maven 和 Git。我们建议你使用诸如 Eclipse Java Neon 或 IntelliJ 之类的开发环境。IDEA要验证您是否满足所有先决条件,请参阅满足完成练习的先决条件。您无需在计算机上安装 Apache Flink 集群。

对您的 AWS 会话进行身份验证

该应用程序使用 Kinesis 数据流来发布数据。在本地运行时,您必须拥有有效的 AWS 经过身份验证的会话,并具有写入 Kinesis 数据流的权限。使用以下步骤对您的会话进行身份验证:

  1. 如果您没有配置带有有效凭据 AWS CLI 的命名配置文件,请参阅设置 AWS Command Line Interface (AWS CLI)

  2. 通过发布以下测试记录,验证您的配置 AWS CLI 是否正确,并且您的用户有权写入 Kinesis 数据流:

    $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
  3. 如果你IDE有插件要集成 AWS,你可以用它来将证书传递给在中运行的应用程序IDE。有关更多信息,请参阅 Intelli J AWS 工具包IDEA和适用于 Ecli pse 的AWS 工具包

下载并查看 Apache Flink 流式传输 Java 代码

此示例的 Java 应用程序代码可从中获得 GitHub。要下载应用程序代码,请执行以下操作:

  1. 使用以下命令克隆远程存储库:

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. 导航到 amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted 目录。

查看应用程序组件

该应用程序完全是在com.amazonaws.services.msf.BasicStreamingJob课堂上实现的。该main()方法定义了用于处理和运行流数据的数据流。

注意

为了优化开发者体验,该应用程序设计为无需更改任何代码即可在适用于 Apache Flink 的 Amazon 托管服务上运行,也可以在本地运行,以便在您的中进行开发。IDE

  • 要读取运行时配置,使其在适用于 Apache Flink 的 Amazon 托管服务中运行时能够正常运行IDE,应用程序会自动检测它是否在本地独立运行。IDE在这种情况下,应用程序加载运行时配置的方式会有所不同:

    1. 当应用程序检测到它在您的独立模式下运行时IDE,请生成项目资源文件夹中包含的application_properties.json文件。文件内容如下。

    2. 当应用程序在适用于 Apache Flink 的亚马逊托管服务中运行时,默认行为会根据您将在适用于 Apache Flink 的亚马逊托管服务 Flink 应用程序中定义的运行时属性加载应用程序配置。请参阅 创建和配置适用于 Apache Flink 的托管服务 Flink 应用程序

      private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
  • main()方法定义应用程序数据流并运行它。

    • 初始化默认的流媒体环境。在此示例中,我们展示了如何创建StreamExecutionEnvironment要与一起使用的 DataSteam API和StreamTableEnvironment要与一起使用的SQL以及表API。这两个环境对象是对同一个运行时环境的两个单独引用,用法不同APIs。

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    • 加载应用程序配置参数。这将自动从正确的位置加载它们,具体取决于应用程序的运行位置:

      Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    • 该应用程序使用 Kinesis Cons umer 连接器定义一个源,用于从输入流中读取数据。输入流的配置在 PropertyGroupId = 中定义InputStream0。直播的名称和区域aws.region分别位于名为stream.name和的属性中。为简单起见,此源将记录读取为字符串。

      private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) { String inputStreamName = inputProperties.getProperty("stream.name"); return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties); } ... public static void main(String[] args) throws Exception { ... SourceFunction<String> source = createSource(applicationParameters.get("InputStream0")); DataStream<String> input = env.addSource(source, "Kinesis Source"); ... }
    • 然后,应用程序使用 Kinesis Streams Sink 连接器定义接收器,将数据发送到输出流。输出流名称和区域在 PropertyGroupId = 中定义OutputStream0,与输入流类似。接收器直接连接到从源获取数据的内部DataStream。在真实的应用程序中,你需要在源和接收器之间进行一些转换。

      private static KinesisStreamsSink<String> createSink(Properties outputProperties) { String outputStreamName = outputProperties.getProperty("stream.name"); return KinesisStreamsSink.<String>builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputStreamName) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } ... public static void main(String[] args) throws Exception { ... Sink<String> sink = createSink(applicationParameters.get("OutputStream0")); input.sinkTo(sink); ... }
    • 最后,运行刚才定义的数据流。在定义了数据流所需的所有运算符之后,这必须是该main()方法的最后一条指令:

      env.execute("Flink streaming Java API skeleton");

使用 pom.xml 文件

pom.xml 文件定义了应用程序所需的所有依赖关系,并设置 Maven Shade 插件来构建包含 Flink 所需的所有依赖项的 fat-jar。

  • 有些依赖关系有provided作用域。当应用程序在适用于 Apache Flink 的亚马逊托管服务中运行时,这些依赖关系将自动可用。它们是编译应用程序或在本地运行应用程序所必需的IDE。有关更多信息,请参阅 在本地运行应用程序。确保您使用的 Flink 版本与您将在适用于 Apache Flink 的亚马逊托管服务中使用的运行时版本相同。

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
  • 您必须使用默认作用域向 pom 添加其他 Apache Flink 依赖项,例如此应用程序使用的 K inesis 连接器。有关更多信息,请参阅 使用 Apache Flink 连接器。您还可以添加应用程序所需的任何其他 Java 依赖项。

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>${aws.connector.version}</version> </dependency>
  • Maven Java 编译器插件确保代码是针对 Apache Flink 目前支持的JDK版本 Java 11 进行编译的。

  • Maven Shade 插件打包了 fat-jar,但不包括运行时提供的一些库。它还指定了两个变压器:ServicesResourceTransformerManifestResourceTransformer。后者配置包含启动应用程序的main方法的类。如果你重命名了主类,别忘了更新这个转换器。

  • <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>

将样本记录写入输入流

在本节中,您将向流发送示例记录以供应用程序处理。您可以通过两种方式生成示例数据,要么使用 Python 脚本,要么使用 Kinesis 数据生成器。

使用 Python 脚本生成示例数据

您可以使用 Python 脚本将示例记录发送到数据流。

注意

要运行这个 Python 脚本,你必须使用 Python 3.x 并安装AWS SDK适用于 Python(Boto)的库。

要开始向 Kinesis 输入流发送测试数据,请执行以下操作:

  1. 从数据生成器 GitHub 存储库下载数据生成器 stock.py Python 脚本。

  2. 运行 stock.py 脚本:

    $ python stock.py

在完成本教程的其余部分的同时,请保持脚本运行。现在你可以运行你的 Apache Flink 应用程序了。

使用 Kinesis 数据生成器生成示例数据

除了使用 Python 脚本之外,您还可以使用 Kinesis 数据生成器(也在托管版本中提供)将随机样本数据发送到流中。Kinesis 数据生成器可在您的浏览器中运行,您无需在计算机上安装任何东西。

要设置和运行 Kinesis 数据生成器,请执行以下操作:

  1. 按照 Kinesis 数据生成器文档中的说明设置该工具的访问权限。您将运行一个用于设置用户和密码的 AWS CloudFormation 模板。

  2. 通过模板生成的数据生成器访问 Kinesis 数据URL生成器。 CloudFormation CloudFormation 模板完成后,您可以URL在 “输出” 选项卡中找到。

  3. 配置数据生成器:

    • 区域:选择您在本教程中使用的区域:us-east-1

    • 直播/传送流:选择应用程序将使用的输入流:ExampleInputStream

    • 每秒记录数:100

    • 录制模板:复制并粘贴以下模板:

      { "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
  4. 测试模板:选择测试模板并验证生成的记录是否与以下内容类似:

    { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
  5. 启动数据生成器:选择 “选择发送数据”。

Kinesis 数据生成器现在正在向... 发送数据。ExampleInputStream

在本地运行应用程序

您可以在本地运行和调试 Flink 应用程序。IDE

注意

在继续操作之前,请验证输入和输出流是否可用。请参阅 创建两个 Amazon Kinesis 数据流。此外,请确认您有权从两个流中读取和写入数据。请参阅 对您的 AWS 会话进行身份验证

设置本地开发环境需要 Java 11 JDK、Apache Maven 和 IDE Java 开发。确认您满足必需的先决条件。请参阅 满足完成练习的先决条件

将 Java 项目导入到你的 IDE

要开始使用中的应用程序IDE,必须将其作为 Java 项目导入。

您克隆的存储库包含多个示例。每个示例都是一个单独的项目。在本教程中,请将./java/GettingStarted子目录中的内容导入IDE到。

使用 Maven 将代码作为现有 Java 项目插入。

注意

导入新 Java 项目的确切过程因IDE您使用的项目而异。

检查本地应用程序配置

在本地运行时,应用程序使用下项目资源文件夹中application_properties.json文件中的配置./src/main/resources。您可以编辑此文件以使用不同的 Kinesis 直播名称或区域。

[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]

设置您的IDE运行配置

您可以IDE直接运行和调试 Flink 应用程序,方法是运行主类com.amazonaws.services.msf.BasicStreamingJob,就像运行任何 Java 应用程序一样。在运行应用程序之前,必须设置运行配置。设置取决于IDE您正在使用的。例如,请参阅 IntelliJ 文档中的运行/调试配置。IDEA特别是,您必须设置以下内容:

  1. 将@@ provided依赖项添加到类路径中。这是确保在本地运行时将具有provided作用域的依赖关系传递给应用程序所必需的。如果不进行此设置,应用程序会立即显示class not found错误。

  2. 将访问 Kinesis 直播的 AWS 凭证传递给应用程序。最快的方法是使用适用于 Intelli IDEA J 的AWS 工具包。在 “运行” 配置中使用此IDE插件,您可以选择特定的 AWS 配置文件。 AWS 使用此配置文件进行身份验证。您无需直接传递 AWS 证书。

  3. 验证是否使用 JDK11 IDE 运行应用程序。

在你的中运行应用程序 IDE

为设置运行配置后BasicStreamingJob,您可以像常规 Java 应用程序一样运行或调试它。

注意

你不能直接java -jar ...从命令行运行 Maven 生成的 fat-jar。此 jar 不包含独立运行应用程序所需的 Flink 核心依赖项。

当应用程序成功启动时,它会记录一些有关独立微型集群和连接器初始化的信息。接下来是 Flink 在应用程序启动时通常会发出的一些WARN日志。INFO

13:43:31,405 INFO com.amazonaws.services.msf.BasicStreamingJob [] - Loading application properties from 'flink-application-properties-dev.json' 13:43:31,549 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 13:43:31,677 INFO org.apache.flink.runtime.minicluster.MiniCluster [] - Starting Flink Mini Cluster ....

初始化完成后,应用程序不会再发出任何日志条目。当数据流动时,不会发出任何日志。

要验证应用程序是否正确处理数据,您可以检查输入和输出 Kinesis 流,如下一节所述。

注意

不发出有关流动数据的日志是 Flink 应用程序的正常行为。在每条记录上发出日志可能便于调试,但在生产环境中运行时可能会增加大量开销。

观察 Kinesis 流中的输入和输出数据

您可以使用 Amazon Kinesis 控制台中的数据查看器观察(生成示例 Python)或 Kinesis 数据生成器(链接)发送到输入流的记录。

观察记录
  1. 在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com

  2. 确认该区域与您运行本教程的区域相同,默认为 us-east-1 美国东部(弗吉尼亚北部)。如果区域不匹配,请更改区域。

  3. 选择 “数据流”。

  4. 选择您要观看的直播,ExampleInputStream或者是 ExampleOutputStream.

  5. 选择 “数据查看器” 选项卡。

  6. 选择任意碎片,保持 “最新” 作为起始位置,然后选择 “获取记录”。您可能会看到 “未找到该请求的记录” 错误。如果是,请选择 “重试获取记录”。发布到直播显示屏的最新记录。

  7. 在 “数据” 列中选择值以检查记录的JSON格式内容。

停止应用程序在本地运行

停止应用程序在您的中运行IDE。IDE通常提供 “停止” 选项。确切的位置和方法取决于IDE你所使用的。

编译并打包您的应用程序代码

在本节中,您将使用 Apache Maven 编译 Java 代码并将其打包到文件中。JAR您可以使用 Maven 命令行工具或您的IDE命令行工具编译和打包代码。

要使用 Maven 命令行进行编译和打包,请执行以下操作:

移至包含 Java GettingStarted 项目的目录并运行以下命令:

$ mvn package

要使用以下方法进行编译和打包IDE:

mvn package从你的 IDE Maven 集成中运行。

在这两种情况下,都会创建以下JAR文件:target/amazon-msf-java-stream-app-1.0.jar

注意

从中运行 “构建项目” IDE 可能无法创建该JAR文件。

上传应用程序代码JAR文件

在本节中,您将您在上一节中创建JAR的文件上传到您在本教程开始时创建的亚马逊简单存储服务 (Amazon S3) Simple Service 存储桶。如果您尚未完成此步骤,请参阅(链接)。

上传应用程序代码JAR文件
  1. 打开 Amazon S3 控制台,网址为https://console.aws.amazon.com/s3/

  2. 选择您之前为应用程序代码创建的存储桶。

  3. 选择上传

  4. 选择 Add files

  5. 导航到上一步中生成的JAR文件:target/amazon-msf-java-stream-app-1.0.jar

  6. 在不更改任何其他设置的情况下选择 “上传”。

警告

确保在中选择了正确的JAR文件<repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar

target目录还包含您无需上传的其他JAR文件。

创建和配置适用于 Apache Flink 的托管服务 Flink 应用程序

您可以使用控制台或 AWS CLI创建和运行Managed Service for Apache Flink的应用程序。在本教程中,您将使用控制台。

注意

当您使用控制台创建应用程序时,系统会为您创建您的 AWS Identity and Access Management (IAM) 和 A CloudWatch mazon Logs 资源。使用创建应用程序时 AWS CLI,可以单独创建这些资源。

创建应用程序

创建应用程序
  1. 在 /flink 上打开适用于 Apache Flink 的托管服务控制台 https://console.aws.amazon.com

  2. 确认选择了正确的区域:us-east-1 美国东部(弗吉尼亚北部)

  3. 打开右侧的菜单,选择 Apache Flink 应用程序,然后选择 “创建流媒体应用程序”。或者,在初始页面的 “入门” 容器中选择 “创建流媒体应用程序”。

  4. 在 “创建流媒体应用程序” 页面上:

    • 选择一种设置流处理应用程序的方法:选择从头开始创建

    • Apache Flink 配置,Application Flink 版本:选择 Ap ache Flink 1.19。

  5. 配置您的应用程序

    • 应用程序名称:输入MyApplication

    • 描述:输入My java test app

    • 访问应用程序资源:选择使用所需策略创建/更新IAM角色kinesis-analytics-MyApplication-us-east-1

  6. 为应用程序设置配置模板

    • 模板:选择 “开发”。

  7. 选择页面底部的创建流媒体应用程序

注意

使用控制台为 Apache Flink 应用程序创建托管服务时,您可以选择为应用程序创建IAM角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些IAM资源使用您的应用程序名称和区域命名,如下所示:

  • 策略:kinesis-analytics-service-MyApplication-us-east-1

  • 角色:kinesisanalytics-MyApplication-us-east-1

适用于 Apache Flink 的亚马逊托管服务以前被称为 Kinesis Data Analytics。为了向后兼容,自动创建的资源的名称前缀kinesis-analytics-为。

编辑IAM政策

编辑IAM策略以添加访问 Kinesis 数据流的权限。

编辑政策
  1. 打开IAM控制台,网址为https://console.aws.amazon.com/iam/

  2. 选择策略。选择控制台在上一部分中为您创建的 kinesis-analytics-service-MyApplication-us-east-1 策略。

  3. 选择 “编辑”,然后选择该JSON选项卡。

  4. 将以下策略示例中突出显示的部分添加到策略中。替换示例账户 IDs (012345678901) 使用您的账户 ID。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" } ] }
  5. 选择页面底部的 “下一步”,然后选择 “保存更改”。

配置应用程序

编辑应用程序配置以设置应用程序代码构件。

编辑配置
  1. MyApplication页面上,选择配置

  2. 在 “应用程序代码位置” 部分:

    • 对于 Amazon S3 存储桶,请选择您之前为应用程序代码创建的存储桶。选择 “浏览” 并选择正确的存储桶,然后选择 “选择”。请勿点击存储桶名称。

    • 在 Amazon S3 对象的路径中,输入amazon-msf-java-stream-app-1.0.jar

  3. 对于访问权限,请选择kinesis-analytics-MyApplication-us-east-1使用所需策略创建/更新IAM角色

  4. 在 “运行时属性” 部分中,添加以下属性。

  5. 选择 “添加新项目” 并添加以下每个参数:

    组 ID
    InputStream0 stream.name ExampleInputStream
    InputStream0 aws.region us-east-1
    OutputStream0 stream.name ExampleOutputStream
    OutputStream0 aws.region us-east-1
  6. 请勿修改任何其他部分。

  7. 选择 Save changes(保存更改)

注意

当您选择启用 Amazon CloudWatch 日志时,适用于 Apache Flink 的托管服务会为您创建一个日志组和日志流。这些资源的名称如下所示:

  • 日志组:/aws/kinesis-analytics/MyApplication

  • 日志流:kinesis-analytics-log-stream

运行应用程序

应用程序现已配置完毕,可以运行了。

运行应用程序
  1. 在适用于 Apache Flink 的亚马逊托管服务的控制台上,选择 “我的应用程序”,然后选择 “运行”。

  2. 在下一页的应用程序还原配置页面上,选择使用最新快照运行,然后选择运行

    应用程序状态” 详细信息会从Ready到,Starting然后转换到应用程序启动Running时。

当应用程序处于Running状态时,您现在可以打开 Flink 控制面板。

打开 控制面板
  1. 选择 “打开 Apache Flink 控制面板”。仪表板将在新页面上打开。

  2. 在 “正在运行的作业” 列表中,选择您可以看到的单个作业。

    注意

    如果您设置了 Runtime 属性或对IAM策略的编辑不正确,则应用程序状态可能会变为Running,但是 Flink 控制面板显示作业正在持续重启。如果应用程序配置错误或缺乏访问外部资源的权限,则通常会出现这种故障。

    发生这种情况时,请查看 Flink 控制面板中的 “异常” 选项卡以查看问题的原因。

观察正在运行的应用程序的指标

在该MyApplication页面的 Amazon CloudWatch 指标部分,您可以看到正在运行的应用程序中的一些基本指标。

查看指标
  1. 在 “刷新” 按钮旁边,从下拉列表中选择 10 秒

  2. 当应用程序运行且运行正常时,您可以看到正常运行时间指标不断增加。

  3. 完全重启指标应为零。如果它增加,则配置可能会出现问题。要调查问题,请查看 Flink 控制面板上的 “异常” 选项卡。

  4. 在运行良好的应用程序中,失败的检查点数指标应为零。

    注意

    此仪表板显示一组固定的指标,粒度为 5 分钟。您可以使用仪表板中的任何指标创建自定义应用程序 CloudWatch 控制面板。

观察 Kinesis 直播中的输出数据

确保您仍在使用 Python 脚本或 Kinesis 数据生成器将数据发布到输入中。

现在,您可以使用中的数据查看器来观察在 Apache Flink 托管服务上运行的应用程序的输出 https://console.aws.amazon.com/kinesis/,就像之前所做的那样。

查看输出
  1. 在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com

  2. 确认该区域与您运行本教程时使用的区域相同。默认情况下,它是 US-East-1US 东部(弗吉尼亚北部)。如有必要,请更改区域。

  3. 选择 “数据流”。

  4. 选择要观看的直播。在本教程中,请使用 ExampleOutputStream

  5. 选择 “数据查看器” 选项卡。

  6. 选择任意碎片,保持 “最新” 作为起始位置,然后选择 “获取记录”。您可能会看到 “未找到该请求的记录” 错误。如果是,请选择 “重试获取记录”。发布到直播显示屏的最新记录。

  7. 在 “数据” 列中选择值以检查记录的JSON格式内容。

停止应用程序

要停止应用程序,请转到名为的 Apache Flink 托管服务应用程序的控制台页面。MyApplication

停止应用程序
  1. 从 “操作” 下拉列表中,选择 “停止”。

  2. 应用程序详细信息中的状态Running变为Stopping,然后转换到应用程序完全停止Ready时。

    注意

    别忘了停止从 Python 脚本或 Kinesis 数据生成器向输入流发送数据。

后续步骤

清理 AWS 资源