

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 Apache Beam 创建应用程序
<a name="examples-beam"></a>

[在本练习中，您将创建一个 Managed Service for Apache Flink，该应用程序使用 Apache Beam 转换数据。](https://beam.apache.org/)Apache Beam 是一种用于处理流数据的编程模型。有关在 Managed Service for Apache Flink 中使用 Apache Beam 的信息，请参阅[将 Apache Beam 与 Managed Service for Apache Flink 应用程序结合使用](how-creating-apps-beam.md)。

**注意**  
要为本练习设置所需的先决条件，请先完成[教程：开始使用 Apache Flink 托管服务中的 DataStream API](getting-started.md)练习。

**Topics**
+ [创建相关资源](#examples-beam-resources)
+ [将示例记录写入输入流](#examples-beam-write)
+ [下载并检查应用程序代码](#examples-beam-download)
+ [编译应用程序代码](#examples-beam-compile)
+ [上传 Apache Flink 流式处理 Java 代码](#examples-beam-upload)
+ [创建并运行适用于 Apache Flink 的托管服务](#examples-beam-create-run)
+ [清理 AWS 资源](#examples-beam-cleanup)
+ [后续步骤](#examples-beam-nextsteps)

## 创建相关资源
<a name="examples-beam-resources"></a>

在本练习中，创建Managed Service for Apache Flink的应用程序之前，您需要创建以下从属资源：
+ 两个 Kinesis 数据流（`ExampleInputStream` 和 `ExampleOutputStream`）。
+ 存储应用程序代码 (`ka-app-code-<username>`) 的 Amazon S3 存储桶 

您可以使用控制台创建 Kinesis 流和 Amazon S3 存储桶。有关创建这些资源的说明，请参阅以下主题：
+ *Amazon Kinesis Data Streams 开发人员指南*中的[创建和更新数据流](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)。将数据流命名为 **ExampleInputStream** 和 **ExampleOutputStream**。
+ *Amazon Simple Storage Service 用户指南*中的[如何创建 S3 存储桶？](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html)。附加您的登录名，以便为 Amazon S3 存储桶指定全局唯一的名称，例如 **ka-app-code-*<username>***。

## 将示例记录写入输入流
<a name="examples-beam-write"></a>

在本节中，您使用 Python 脚本将随机字符串写入流，以供应用程序处理。

**注意**  
此部分需要 [AWS SDK for Python (Boto)](https://aws.amazon.com/developers/getting-started/python/)。

1. 使用以下内容创建名为 `ping.py` 的文件：

   ```
   import json
   import boto3
   import random
   
   kinesis = boto3.client('kinesis')
   
   while True:
           data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat'])
           print(data)
           kinesis.put_record(
                   StreamName="ExampleInputStream",
                   Data=data,
                   PartitionKey="partitionkey")
   ```

1. 运行 `ping.py` 脚本：

   ```
   $ python ping.py
   ```

   在完成本教程的其余部分时，请将脚本保持运行状态。

## 下载并检查应用程序代码
<a name="examples-beam-download"></a>

此示例的 Java 应用程序代码可从中获得 GitHub。要下载应用程序代码，请执行以下操作：

1. 如果尚未安装 Git 客户端，请安装它。有关更多信息，请参阅[安装 Git](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)。

1. 使用以下命令克隆远程存储库：

   ```
   git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
   ```

1. 导航到 `amazon-kinesis-data-analytics-java-examples/Beam` 目录。

应用程序代码位于 `BasicBeamStreamingJob.java` 文件中。请注意有关应用程序代码的以下信息：
+ 该应用程序使用 Apache Beam 通过调用名[ParDo](https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/transforms/ParDo.html)为的自定义转换函数来处理传入的记录。`PingPongFn`

  调用该`PingPongFn`函数的代码如下：

  ```
  .apply("Pong transform",
      ParDo.of(new PingPongFn())
  ```
+ 使用 Apache Beam 的 Managed Service for Apache Flink 应用程序需要以下组件。如果您未在中包含这些组件和版本`pom.xml`，则您的应用程序会从环境依赖项中加载错误的版本，并且由于版本不匹配，您的应用程序将在运行时崩溃。

  ```
  <jackson.version>2.10.2</jackson.version>
  ...
  <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-jaxb-annotations</artifactId>
      <version>2.10.2</version>
  </dependency>
  ```
+ 除非输入数据是 **ping**，`PingPongFn`否则转换函数会将输入数据传递到输出流，在这种情况下，它会向输出流发出字符串 **pong\$1 n**。

  变换函数的代码如下：

  ```
      private static class PingPongFn extends DoFn<KinesisRecord, byte[]> {
      private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class);
      
      @ProcessElement
      public void processElement(ProcessContext c) {
          String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8);
          if (content.trim().equalsIgnoreCase("ping")) {
              LOG.info("Ponged!");
              c.output("pong\n".getBytes(StandardCharsets.UTF_8));
          } else {
              LOG.info("No action for: " + content);
              c.output(c.element().getDataAsBytes());
          }
      }
  }
  ```

## 编译应用程序代码
<a name="examples-beam-compile"></a>

要编译应用程序，请执行以下操作：

1. 如果还没有 Java 和 Maven，请安装它们。有关更多信息，请参阅[教程：开始使用 Apache Flink 托管服务中的 DataStream API](getting-started.md)教程中的[完成所需的先决条件](getting-started.md#setting-up-prerequisites)。

1. 使用以下命令编译应用程序：

   ```
   mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
   ```
**注意**  
提供的源代码依赖于 Java 11 中的库。

编译应用程序将创建应用程序 JAR 文件 (`target/basic-beam-app-1.0.jar`)。

## 上传 Apache Flink 流式处理 Java 代码
<a name="examples-beam-upload"></a>

在本节中，您将应用程序代码上传到在[创建相关资源](#examples-beam-resources)一节中创建的 Amazon S3 存储桶。

1. 在 Amazon S3 控制台中，选择 **ka-app-code-*<username>*** 存储桶，然后选择**上传**。

1. 在**选择文件**步骤中，选择**添加文件**。导航到您在上一步中创建的 `basic-beam-app-1.0.jar` 文件。

1. 您无需更改该对象的任何设置，因此，请选择**上传**。

您的应用程序代码现在存储在 Amazon S3 存储桶中，应用程序可以在其中访问代码。

## 创建并运行适用于 Apache Flink 的托管服务
<a name="examples-beam-create-run"></a>

按照以下步骤，使用控制台创建、配置、更新和运行应用程序。

### 创建应用程序
<a name="examples-beam-create"></a>

1. 登录并通过 /f AWS 管理控制台 link 打开亚马逊 MSF 控制台。 https://console.aws.amazon.com

1. 在 Managed Service for Apache Flink 控制面板上，选择**创建分析应用程序**。

1. 在**Managed Service for Apache Flink - 创建应用程序**页面上，提供应用程序详细信息，如下所示：
   + 对于 **应用程序名称 **，输入 **MyApplication**。
   + 对于**运行时系统**，请选择 **Apache Flink**。
**注意**  
Apache Beam 目前与 Apache Flink 版本 1.19 或更高版本不兼容。
   + 从版本下拉列表中选择 **Apache Flink 版本 1.15**。

1. 对于**访问权限**，请选择 **创建/更新 IAM 角色 `kinesis-analytics-MyApplication-us-west-2`**。

1. 选择**创建应用程序**。

**注意**  
在使用控制台创建应用程序的 Managed Service for Apache Flink时，您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的，如下所示：  
策略：`kinesis-analytics-service-MyApplication-us-west-2`
角色：`kinesis-analytics-MyApplication-us-west-2`

### 编辑 IAM 策略
<a name="get-started-exercise-7-console-iam"></a>

编辑 IAM policy 以添加访问 Kinesis 数据流的权限。

1. 使用 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 打开 IAM 控制台。

1. 选择**策略**。选择控制台在上一部分中为您创建的 **`kinesis-analytics-service-MyApplication-us-west-2`** 策略。

1. 在 **摘要** 页面上，选择 **编辑策略**。选择 **JSON** 选项卡。

1. 将以下策略示例中突出显示的部分添加到策略中。将示例账户 IDs (*012345678901*) 替换为您的账户 ID。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "logs:DescribeLogGroups",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*",
                   "arn:aws:s3:::ka-app-code-<username>/basic-beam-app-1.0.jar"
               ]
           },
           {
               "Sid": "DescribeLogStreams",
               "Effect": "Allow",
               "Action": "logs:DescribeLogStreams",
               "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
           },
           {
               "Sid": "PutLogEvents",
               "Effect": "Allow",
               "Action": "logs:PutLogEvents",
               "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

### 配置应用程序
<a name="examples-beam-configure"></a>

1. 在**MyApplication**页面上，选择**配置**。

1. 在 **配置应用程序** 页面上，提供 **代码位置**：
   + 对于**Amazon S3 存储桶**，请输入**ka-app-code-*<username>***。
   + **在 Amazon S3 对象的路径**中，输入**basic-beam-app-1.0.jar**。

1. 在 **对应用程序的访问权限** 下，对于 **访问权限**，选择 **创建/更新 IAM 角色 `kinesis-analytics-MyApplication-us-west-2`**。

1. 输入以下信息：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/examples-beam.html)

1. 在 **监控** 下，确保 ** 监控指标级别** 设置为 **应用程序**。

1. 要进行**CloudWatch 日志记录**，请选中 “**启用**” 复选框。

1. 选择**更新**。

**注意**  
当您选择启用 CloudWatch 日志记录时，适用于 Apache Flink 的托管服务会为您创建日志组和日志流。这些资源的名称如下所示：  
日志组：`/aws/kinesis-analytics/MyApplication`
日志流：`kinesis-analytics-log-stream`
该日志流用于监控应用程序。这与应用程序用于发送结果的日志流不同。

### 运行应用程序
<a name="examples-beam-run"></a>

可以通过运行应用程序、打开 Apache Flink 控制面板并选择所需的 Flink 任务来查看 Flink 任务图。

您可以在 CloudWatch 控制台上查看托管服务的 Apache Flink 指标，以验证应用程序是否正常运行。

## 清理 AWS 资源
<a name="examples-beam-cleanup"></a>

本节包括清理在 Tumbling Window 教程中创建的 AWS 资源的过程。

**Topics**
+ [删除 Managed Service for Apache Flink 应用程序](#examples-beam-cleanup-app)
+ [删除您的 Kinesis 数据流](#examples-beam-cleanup-stream)
+ [删除您的 Amazon S3 对象和存储桶](#examples-beam-cleanup-s3)
+ [删除您的 IAM 资源](#examples-beam-cleanup-iam)
+ [删除您的 CloudWatch 资源](#examples-beam-cleanup-cw)

### 删除 Managed Service for Apache Flink 应用程序
<a name="examples-beam-cleanup-app"></a>

1. 登录并通过 /f AWS 管理控制台 link 打开亚马逊 MSF 控制台。 https://console.aws.amazon.com

1. 在 Apache Flink 的托管服务面板中，选择。**MyApplication**

1. 在应用程序的页面中，选择 **删除**，然后确认删除。

### 删除您的 Kinesis 数据流
<a name="examples-beam-cleanup-stream"></a>

1. [在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 在 Kinesis Data Streams 面板中，**ExampleInputStream**选择。

1. 在该**ExampleInputStream**页面中，选择 “**删除 Kinesis Stream**”，然后确认删除。

1. 在 **Kinesis 直播**页面中 **ExampleOutputStream**，选择，选择**操作**，选择**删除**，然后确认删除。

### 删除您的 Amazon S3 对象和存储桶
<a name="examples-beam-cleanup-s3"></a>

1. 打开 Amazon S3 控制台，网址为 [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)。

1. 选择 **ka-app-code-*<username>* 存储桶。**

1. 选择 **删除**，然后输入存储桶名称以确认删除。

### 删除您的 IAM 资源
<a name="examples-beam-cleanup-iam"></a>

1. 使用 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 打开 IAM 控制台。

1. 在导航栏中，选择**策略**。

1. 在筛选条件控件中，输入 **kinesis**。

1. 选择 **kinesis-analytics-service--us-MyApplication west-2 策略**。

1. 选择 **策略操作**，然后选择 **删除**。

1. 在导航栏中，选择 **角色**。

1. 选择 k **inesis-analytics-us-west-2 角色MyApplication**。

1. 选择 **删除角色**，然后确认删除。

### 删除您的 CloudWatch 资源
<a name="examples-beam-cleanup-cw"></a>

1. 打开 CloudWatch 控制台，网址为[https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/)。

1. 在导航栏中，选择 **日志**。

1. 选择**/aws/kinesis-analytics/MyApplication**日志组。

1. 选择 **删除日志组**，然后确认删除。

## 后续步骤
<a name="examples-beam-nextsteps"></a>

现在，您已经创建并运行了基本的 Managed Service for Apache Flink应用程序，该应用程序使用 Apache Beam 转换数据，有关更高级的 Managed Service for Apache Flink解决方案的示例，请参阅以下应用程序。
+ **[ Managed Service for Apache Flink 流式研讨会上的 Beam ](https://streaming-analytics.workshop.aws/beam-on-kda/)**：在本研讨会中，我们将探讨一个端到端的示例，该示例将批处理和流媒体方面结合在一个统一的 Apache Beam 管道中。