使用 Apache Beam 创建应用程序 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Apache Beam 创建应用程序

在本练习中,您将创建一个 Managed Service for Apache Flink,该应用程序使用 Apache Beam 转换数据。Apache Beam 是一种用于处理流数据的编程模型。有关在 Managed Service for Apache Flink 中使用 Apache Beam 的信息,请参阅在 Apache Flink 应用程序中使用带有托管服务的 Apache Beam

注意

要为本练习设置所需的先决条件,请先完成教程:开始使用适用于 Apache Flink 的托管服务 DataStream API中的练习。

创建依赖资源

在本练习中,创建Managed Service for Apache Flink的应用程序之前,您需要创建以下从属资源:

  • 两个 Kinesis 数据流(ExampleInputStreamExampleOutputStream)。

  • 存储应用程序代码 (ka-app-code-<username>) 的 Amazon S3 存储桶

您可以使用控制台创建 Kinesis 流和 Amazon S3 存储桶。有关创建这些资源的说明,请参阅以下主题:

  • Amazon Kinesis Data Streams 开发人员指南中的创建和更新数据流。将数据流命名为 ExampleInputStreamExampleOutputStream

  • Amazon Simple Storage Service 用户指南中的如何创建 S3 存储桶?。附加您的登录名,以便为 Amazon S3 存储桶指定全局唯一的名称,例如 ka-app-code-<username>

将样本记录写入输入流

在本节中,您使用 Python 脚本将随机字符串写入流,以供应用程序处理。

注意

此部分需要 AWS SDK for Python (Boto)

  1. 使用以下内容创建名为 ping.py 的文件:

    import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")
  2. 运行 ping.py 脚本:

    $ python ping.py

    在完成本教程的其余部分时,请将脚本保持运行状态。

下载并检查应用程序代码

此示例的 Java 应用程序代码可从中获得 GitHub。要下载应用程序代码,请执行以下操作:

  1. 如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git

  2. 使用以下命令克隆远程存储库:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. 导航到 amazon-kinesis-data-analytics-java-examples/Beam 目录。

应用程序代码位于 BasicBeamStreamingJob.java 文件中。请注意有关应用程序代码的以下信息:

  • 应用程序使用 Apache Beam 通过调用名ParDo为的自定义转换函数来处理传入的记录。PingPongFn

    调用该PingPongFn函数的代码如下:

    .apply("Pong transform", ParDo.of(new PingPongFn())
  • 使用 Apache Beam 的 Managed Service for Apache Flink 应用程序需要以下组件。如果您未在中包含这些组件和版本pom.xml,则您的应用程序会从环境依赖项中加载错误的版本,并且由于版本不匹配,您的应用程序将在运行时崩溃。

    <jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>
  • 除非输入数据是 pingPingPongFn否则转换函数会将输入数据传递到输出流,在这种情况下,它会向输出流发出字符串 pong\ n

    变换函数的代码如下:

    private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }

编译应用程序代码

要编译应用程序,请执行以下操作:

  1. 如果还没有 Java 和 Maven,请安装它们。有关更多信息,请参阅教程:开始使用适用于 Apache Flink 的托管服务 DataStream API中的教程中的完成必需的先决条件

  2. 使用以下命令编译应用程序:

    mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
    注意

    提供的源代码依赖于 Java 11 中的库。

编译应用程序会创建应用程序JAR文件 (target/basic-beam-app-1.0.jar)。

上传 Apache Flink 直播 Java 代码

在本节中,您将应用程序代码上传到在创建依赖资源一节中创建的 Amazon S3 存储桶。

  1. 在 Amazon S3 控制台中,选择 ka-app-code-<username>存储桶,然后选择上传

  2. 选择文件步骤中,选择添加文件。导航到您在上一步中创建的 basic-beam-app-1.0.jar 文件。

  3. 您无需更改该对象的任何设置,因此,请选择上传

您的应用程序代码现在存储在 Amazon S3 存储桶中,应用程序可以在其中访问代码。

创建并运行适用于 Apache 的托管服务 Flink 应用程序

按照以下步骤,使用控制台创建、配置、更新和运行应用程序。

创建应用程序

  1. 在 /flink 上打开适用于 Apache Flink 的托管服务控制台 https://console.aws.amazon.com

  2. 在 Managed Service for Apache Flink 控制面板上,选择创建分析应用程序

  3. Managed Service for Apache Flink - 创建应用程序页面上,提供应用程序详细信息,如下所示:

    • 对于 应用程序名称 ,输入 MyApplication

    • 对于运行时系统,请选择 Apache Flink

      注意

      Apache Beam 目前与 Apache Flink 版本 1.19 或更高版本不兼容。

    • 从版本下拉列表中选择 Apache Flink 版本 1.15

  4. 对于访问权限,请选择创建/更新IAM角色kinesis-analytics-MyApplication-us-west-2

  5. 选择创建应用程序

注意

使用控制台为 Apache Flink 应用程序创建托管服务时,您可以选择为应用程序创建IAM角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些IAM资源使用您的应用程序名称和区域命名,如下所示:

  • 策略:kinesis-analytics-service-MyApplication-us-west-2

  • 角色:kinesis-analytics-MyApplication-us-west-2

编辑IAM政策

编辑IAM策略以添加访问 Kinesis 数据流的权限。

  1. 打开IAM控制台,网址为https://console.aws.amazon.com/iam/

  2. 选择策略。选择控制台在上一部分中为您创建的 kinesis-analytics-service-MyApplication-us-west-2 策略。

  3. 摘要 页面上,选择 编辑策略。选择JSON选项卡。

  4. 将以下策略示例中突出显示的部分添加到策略中。替换示例账户 IDs (012345678901) 使用您的账户 ID。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*", "arn:aws:s3:::ka-app-code-<username>/basic-beam-app-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

配置应用程序

  1. MyApplication页面上,选择配置

  2. 配置应用程序 页面上,提供 代码位置

    • 对于Amazon S3 存储桶,请输入ka-app-code-<username>

    • 在 Amazon S3 对象的路径中,输入basic-beam-app-1.0.jar

  3. 在 “应用程序资源访问权限” 下,选择 “访问权限” 的 “创建/更新IAM角色kinesis-analytics-MyApplication-us-west-2

  4. 输入以下信息:

    组 ID
    BeamApplicationProperties InputStreamName ExampleInputStream
    BeamApplicationProperties OutputStreamName ExampleOutputStream
    BeamApplicationProperties AwsRegion us-west-2
  5. 监控 下,确保 监控指标级别 设置为 应用程序

  6. 要进行CloudWatch 日志记录,请选中 “启用” 复选框。

  7. 选择更新

注意

当您选择启用 CloudWatch 日志记录时,适用于 Apache Flink 的托管服务会为您创建日志组和日志流。这些资源的名称如下所示:

  • 日志组:/aws/kinesis-analytics/MyApplication

  • 日志流:kinesis-analytics-log-stream

该日志流用于监控应用程序。这与应用程序用于发送结果的日志流不同。

运行应用程序

可以通过运行应用程序、打开 Apache Flink 控制面板并选择所需的 Flink 任务来查看 Flink 任务图。

您可以在 CloudWatch 控制台上查看托管服务的 Apache Flink 指标,以验证应用程序是否正常运行。

清理 AWS 资源

本节包括清理在 Tumbling Window 教程中创建的 AWS 资源的过程。

删除你的 Apache 托管服务 Flink 应用程序

  1. 在 /flink 上打开适用于 Apache Flink 的托管服务控制台 https://console.aws.amazon.com

  2. 在 Apache Flink 的托管服务面板中,选择。MyApplication

  3. 在应用程序的页面中,选择 删除,然后确认删除。

删除你的 Kinesis 数据流

  1. 在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com

  2. 在 Kinesis Data Streams 面板中,ExampleInputStream选择。

  3. 在该ExampleInputStream页面中,选择 “删除 Kinesis Stream”,然后确认删除。

  4. Kinesis 直播页面中 ExampleOutputStream,选择,选择操作,选择删除,然后确认删除。

删除您的 Amazon S3 对象和存储桶

  1. 打开 Amazon S3 控制台,网址为https://console.aws.amazon.com/s3/

  2. 选择 ka-app-code-<username> 桶。

  3. 选择 删除,然后输入存储桶名称以确认删除。

删除您的IAM资源

  1. 打开IAM控制台,网址为https://console.aws.amazon.com/iam/

  2. 在导航栏中,选择策略

  3. 在筛选条件控件中,输入 kinesis

  4. 选择 kinesis-analytics-service--us-MyApplication west-2 策略

  5. 选择 策略操作,然后选择 删除

  6. 在导航栏中,选择 角色

  7. 选择 k inesis-analytics-us-west-2 角色MyApplication

  8. 选择 删除角色,然后确认删除。

删除您的 CloudWatch 资源

  1. 打开 CloudWatch 控制台,网址为https://console.aws.amazon.com/cloudwatch/

  2. 在导航栏中,选择 日志

  3. 选择 /aws/kinesis-analytics/ 日志组MyApplication

  4. 选择 删除日志组,然后确认删除。

后续步骤

现在,您已经创建并运行了基本的 Managed Service for Apache Flink应用程序,该应用程序使用 Apache Beam 转换数据,有关更高级的 Managed Service for Apache Flink解决方案的示例,请参阅以下应用程序。