

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 创建并运行面向 Python 应用程序的 Managed Service for Apache Flink
<a name="gs-python-createapp"></a>

在本节中，您将创建面向 Python 的 Managed Service for Apache Flink 应用程序，并将 Kinesis 流作为源和接收器。

**Topics**
+ [创建相关资源](#gs-python-resources)
+ [设置本地开发环境](#gs-python-set-up)
+ [下载并检查 Apache Flink 流式处理 Python 代码](#gs-python-download)
+ [管理 JAR 依赖项](#gs-python-jar-dependencies)
+ [将示例记录写入输入流](#gs-python-sample-records)
+ [在本地运行应用程序](#gs-python-run-locally)
+ [观察 Kinesis 流中的输入和输出数据](#gs-python-observe-input-output)
+ [停止在本地运行的应用程序](#gs-python-stop)
+ [打包应用程序代码](#gs-python-package-code)
+ [将应用程序包上传到 Amazon S3 存储桶](#gs-python-upload-bucket)
+ [创建并配置 Managed Service for Apache Flink 应用程序](#gs-python-7)
+ [后续步骤](#gs-python-next-step-4)

## 创建相关资源
<a name="gs-python-resources"></a>

在本练习中创建 Managed Service for Apache Flink之前，您需要创建以下从属资源：
+ 两个 Kinesis 流用于输入和输出。
+ 存储应用程序代码的 Amazon S3 存储桶。

**注意**  
本教程假设您在 us-east-1 区域中部署应用程序。如果您使用其他区域，则必须相应地调整所有步骤。

### 创建两个 Kinesis 流
<a name="gs-python-resources-streams"></a>

在为本练习创建 Managed Service for Apache Flink 应用程序之前，请在要用于部署应用程序的同一区域（本示例中的 us-east-1）中创建两个 Kinesis 数据流（`ExampleInputStream` 和 `ExampleOutputStream`）。您的应用程序将这些数据流用于应用程序源和目标流。

可以使用 Amazon Kinesis 控制台或以下 AWS CLI 命令创建这些流。有关控制台说明，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[创建和更新数据流](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)。

**创建数据流 (AWS CLI)**

1. 要创建第一个直播 (`ExampleInputStream`)，请使用以下 Amazon Kinesis 命令`create-stream` AWS CLI 。

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-east-1
   ```

1. 要创建应用程序用来写入输出的第二个流，请运行同一命令（将流名称更改为 `ExampleOutputStream`）。

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-east-1
   ```

### 创建 Amazon S3 存储桶
<a name="gs-python-resources-s3"></a>

您可以使用控制台来创建 Amazon S3 存储桶。有关创建该资源的说明，请参阅以下主题：
+ *《Amazon Simple Storage Service 用户指南》*中的[如何创建 S3 存储桶？](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html)。附加您的登录名，以便为 Amazon S3 存储桶指定全局唯一的名称。
**注意**  
请确保在用于本教程的区域（us-east-1）中创建 S3 存储桶。

### 其他 资源
<a name="gs-python-resources-cw"></a>

在您创建应用程序时，适用于 Apache Flink 的托管服务会创建以下 Amazon CloudWatch 资源（如果这些资源尚不存在）：
+ 名为 `/AWS/KinesisAnalytics-java/<my-application>` 的日志组。
+ 名为 `kinesis-analytics-log-stream` 的日志流。

## 设置本地开发环境
<a name="gs-python-set-up"></a>

为进行开发和调试，可以在自己的计算机上运行 Python Flink 应用程序。您可以使用 `python main.py` 或所选的 Python IDE 从命令行中启动应用程序。

**注意**  
在您的开发计算机上，必须安装 Python 3.10 或 3.11、Java 11、Apache Maven 和 Git。我们建议您使用诸如[PyCharm](https://www.jetbrains.com/pycharm/)或 [Visual Studio Code](https://code.visualstudio.com/) 之类的 IDE。要验证您是否满足所有先决条件，请在继续操作之前参阅 [满足完成练习的先决条件](gs-python.md#gs-python-prerequisites)。

### 安装 PyFlink 库
<a name="gs-python-install-pyflink"></a>

要开发您的应用程序并在本地运行它，您必须安装 Flink Python 库。

1. 使用 VirtualEnv、Conda 或任何类似的 Python 工具创建独立的 Python 环境。

1. 在该环境中安装 PyFlink 库。使用与 Amazon Managed Service for Apache Flink 中所使用相同的 Apache Flink 运行时版本。目前，建议的运行时版本为 1.19.1。

   ```
   $ pip install apache-flink==1.19.1
   ```

1. 运行应用程序时，请确保环境处于激活状态。如果在 IDE 中运行应用程序，请确保 IDE 使用该环境作为运行时。该过程取决于您使用的 IDE。
**注意**  
您只需要安装 PyFlink 库即可。您**无**需在计算机上安装 Apache Flink 集群。

### 对您的 AWS 会话进行身份验证
<a name="gs-python-authenticate"></a>

该应用程序使用 Kinesis 数据流来发布数据。在本地运行时，您必须拥有有效的 AWS 经过身份验证的会话，并具有写入 Kinesis 数据流的权限。按照以下步骤对会话进行身份验证：

1. 如果您没有配置带有有效凭据 AWS CLI 的命名配置文件，请参阅[设置 AWS Command Line Interface (AWS CLI)](setup-awscli.md)。

1. 通过发布以下测试记录，验证您的配置 AWS CLI 是否正确，并且您的用户有权写入 Kinesis 数据流：

   ```
   $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
   ```

1. 如果您的 IDE 有要集成的插件 AWS，则可以使用该插件将凭据传递给 IDE 中运行的应用程序。有关更多信息，请参阅适用于 [Visual Studio 代码[的AWS 工具](https://aws.amazon.com/pycharm/)AWS 包 PyCharm、适用于](https://aws.amazon.com/visualstudiocode/) IntelliJ IDEA 的[AWS 工具包和适用于 IntelliJ](https://aws.amazon.com/intellij/) IDEA 的工具包。

## 下载并检查 Apache Flink 流式处理 Python 代码
<a name="gs-python-download"></a>

此示例的 Python 应用程序代码可从中获得 GitHub。要下载应用程序代码，请执行以下操作：

1. 使用以下命令克隆远程存储库：

   ```
   git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
   ```

1. 导航到 `./python/GettingStarted` 目录。

### 审核应用程序组件
<a name="gs-python-review"></a>

应用程序代码位于 `main.py` 中。我们使用 Python 中嵌入的 SQL 来定义应用程序的流程。

**注意**  
为优化开发人员体验，该应用程序设计为无需更改任何代码即可同时在 Amazon Managed Service for Apache Flink 上和本地运行，以便在您的计算机上进行开发。应用程序使用环境变量 `IS_LOCAL = true` 来检测其何时在本地运行。必须在 Shell 上或 IDE 的运行配置中设置环境变量 `IS_LOCAL = true`。
+ 应用程序设置执行环境并读取运行时配置。要在 Amazon Managed Service for Apache Flink 上和本地运行，应用程序会检查 `IS_LOCAL` 变量。
  + 以下是应用程序在 Amazon Managed Service for Apache Flink 中运行时的默认行为：

    1. 加载随应用程序打包的依赖项。有关更多信息，请参阅（LINK）

    1. 从在 Amazon Managed Service for Apache Flink 应用程序中定义的运行时属性加载配置。有关更多信息，请参阅（LINK）
  + 如果在本地运行应用程序，则在应用程序检测 `IS_LOCAL = true` 时：

    1. 从项目加载外部依赖项。

    1. 从项目中包含的 `application_properties.json` 文件加载配置。

       ```
       ...
       APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json"
       ...
       is_local = (
           True if os.environ.get("IS_LOCAL") else False
       )
       ...
       if is_local:
           APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json"
           CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
           table_env.get_config().get_configuration().set_string(
               "pipeline.jars",
               "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar",
           )
       ```
+ 应用程序使用 [Kinesis 连接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kinesis/)定义带有 `CREATE TABLE` 语句的源表。此表从输入 Kinesis 流中读取数据。应用程序从运行时配置中获取流的名称、区域和初始位置。

  ```
  table_env.execute_sql(f"""
          CREATE TABLE prices (
                  ticker VARCHAR(6),
                  price DOUBLE,
                  event_time TIMESTAMP(3),
                  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
                )
                PARTITIONED BY (ticker)
                WITH (
                  'connector' = 'kinesis',
                  'stream' = '{input_stream_name}',
                  'aws.region' = '{input_stream_region}',
                  'format' = 'json',
                  'json.timestamp-format.standard' = 'ISO-8601'
                ) """)
  ```
+ 在此示例中，应用程序还使用 [Kinesis 连接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kinesis/)定义接收表。此表将数据发送到输出 Kinesis 流。

  ```
  table_env.execute_sql(f"""
              CREATE TABLE output (
                  ticker VARCHAR(6),
                  price DOUBLE,
                  event_time TIMESTAMP(3)
                )
                PARTITIONED BY (ticker)
                WITH (
                  'connector' = 'kinesis',
                  'stream' = '{output_stream_name}',
                  'aws.region' = '{output_stream_region}',
                  'sink.partitioner-field-delimiter' = ';',
                  'sink.batch.max-size' = '100',
                  'format' = 'json',
                  'json.timestamp-format.standard' = 'ISO-8601'
                )""")
  ```
+ 最后，应用程序执行 SQL，该 SQL 从源表中 `INSERT INTO...` 接收器表。在更复杂的应用程序中，在写入接收器之前，您可能还要执行其他步骤来转换数据。

  ```
  table_result = table_env.execute_sql("""INSERT INTO output 
          SELECT ticker, price, event_time FROM prices""")
  ```
+ 您必须在 `main()` 函数末尾再添加一个步骤，才能在本地运行应用程序：

  ```
  if is_local:
      table_result.wait()
  ```

  如果不使用此语句，则当您在本地运行应用程序时，它会立即终止。在 Amazon Managed Service for Apache Flink 中运行应用程序时，不得执行此语句。

## 管理 JAR 依赖项
<a name="gs-python-jar-dependencies"></a>

 PyFlink 应用程序通常需要一个或多个连接器。本教程中的应用程序使用 [Kinesis 连接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kinesis/)。由于 Apache Flink 在 Java JVM 中运行，因此无论是否使用 Python 实施应用程序，连接器都会作为 JAR 文件分发。在 Amazon Managed Service for Apache Flink 上部署应用程序时，必须将这些依赖项与应用程序打包。

在此示例中，我们展示如何使用 Apache Maven 获取依赖项并打包应用程序以在 Managed Service for Apache Flink 上运行。

**注意**  
还可使用其他方法获取和打包依赖项。此示例演示正确适用于一个或多个连接器的方法。它还允许您在本地运行应用程序以进行开发，也可以在 Managed Service for Apache Flink 上运行应用程序，而无需更改代码。

### 使用 pom.xml 文件
<a name="gs-python-jar-pom"></a>

Apache Maven 使用 `pom.xml` 文件来控制依赖项和应用程序打包。

所有 JAR 依赖项均在 `pom.xml` 文件的 `<dependencies>...</dependencies>` 块中指定。

```
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    ...
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kinesis</artifactId>
            <version>4.3.0-1.19</version>
        </dependency>
    </dependencies>
    ...
```

要查找要使用的正确构件和连接器版本，请参阅 [将 Apache Flink 连接器与 Managed Service for Apache Flink 一起使用](how-flink-connectors.md)。请务必参考正在使用的 Apache Flink 版本。在此示例中，我们使用的是 Kinesis 连接器。对于 Apache Flink 1.19，连接器版本为 `4.3.0-1.19`。

**注意**  
如果您使用的是 Apache Flink 1.19，则没有专门为此版本发布的连接器版本。使用为 1.18 版本发布的连接器。

### 下载和打包依赖项
<a name="gs-python-dependencies-download"></a>

使用 Maven 下载 `pom.xml` 文件中定义的依赖项，然后将其打包给 Python Flink 应用程序。

1. 导航到包含 Python 入门项目（名为 `python/GettingStarted`）的目录。

1. 运行如下命令：

```
$ mvn package
```

Maven 创建名为 `./target/pyflink-dependencies.jar` 的新文件。当您在计算机上进行本地开发时，Python 应用程序会查找此文件。

**注意**  
如果您忘记运行此命令，则在尝试运行应用程序时，它将失败并显示错误：**找不到标识符 "kinesis" 的任何工厂**。

## 将示例记录写入输入流
<a name="gs-python-sample-records"></a>

在本节中，使用 Python 脚本将示例记录写入流，以供应用程序处理。您可以通过两种方式生成示例数据，即使用 Python 脚本或使用 [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator)。

### 使用 Python 脚本生成示例数据
<a name="gs-python-sample-data"></a>

您可以使用 Python 脚本将示例记录发送到数据流。

**注意**  
要运行此 Python 脚本，必须使用 Python 3.x 并安装[适用于 Python 的AWS SDK（Boto）](https://aws.amazon.com/developer/language/python/)库。

**要开始向 Kinesis 输入流发送测试数据，请执行以下操作：**

1. 从数据生成器[ GitHub 存储库](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/data-generator)下载数据生成器 `stock.py` Python 脚本。

1. 运行 `stock.py` 脚本：

   ```
   $ python stock.py
   ```

在完成本教程的其余部分时，请将脚本保持运行状态。您现在可以运行 Apache Flink 应用程序。

### 使用 Kinesis Data Generator 生成示例数据
<a name="gs-python-sample-kinesis"></a>

除了使用 Python 脚本之外，还可以使用 [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator)（也以[托管版本](https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html)提供）将随机示例数据发送到流中。Kinesis Data Generator 在浏览器中运行，无需在计算机上安装任何工具。

**要设置和运行 Kinesis Data Generator，请执行以下操作：**

1. 按照 [Kinesis Data Generator 文档](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)中的说明设置该工具的访问权限。您将运行一个用于设置用户和密码的 CloudFormation 模板。

1. 通过模板生成的网址访问 Kinesis 数据生成器。 CloudFormation CloudFormation 模板完成后，您可以在 “**输出**” 选项卡中找到 URL。

1. 配置数据生成器：
   + **区域：**选择您在本教程中使用的区域：us-east-1
   + **流/传输流：**选择应用程序将使用的输入流：`ExampleInputStream`
   + **每秒记录数：**100
   + **记录模板：**复制并粘贴以下模板：

     ```
     {
       "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}},
       "ticker" : "{{random.arrayElement(
             ["AAPL", "AMZN", "MSFT", "INTC", "TBV"]
         )}}",
       "price" : {{random.number(100)}}          
     }
     ```

1. 测试模板：选择**测试模板**并验证生成的记录是否与以下内容类似：

   ```
   { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
   ```

1. 启动数据生成器：选择**选择发送数据**。

Kinesis Data Generator 现在向 `ExampleInputStream` 发送数据。

## 在本地运行应用程序
<a name="gs-python-run-locally"></a>

可以在本地测试应用程序，并且使用 `python main.py` 从命令行或从 IDE 运行该应用程序。

要在本地运行应用程序，必须安装正确版本的 PyFlink库，如上一节所述。有关更多信息，请参阅（LINK）

**注意**  
在继续之前，请确认输入和输出流是否可用。请参阅[创建两个 Amazon Kinesis 数据流](get-started-exercise.md#get-started-exercise-1)。此外，请确认您是否具有从两个流中读取和写入的权限。请参阅[对您的 AWS 会话进行身份验证](get-started-exercise.md#get-started-exercise-2-5)。

### 将 Python 项目导入您的 IDE
<a name="gs-python-import"></a>

要开始在 IDE 中处理应用程序，必须将其作为 Python 项目导入。

您克隆的存储库包含多个示例。每个示例都是一个单独的项目。在本教程中，请将 `./python/GettingStarted` 子目录中的内容导入 IDE。

将代码作为现有 Python 项目导入。

**注意**  
导入新 Python 项目的确切过程因所使用的 IDE 而异。

### 检查本地应用程序配置
<a name="gs-python-check-configuration"></a>

在本地运行时，应用程序使用 `./src/main/resources` 下项目资源文件夹中的 `application_properties.json` 文件内的配置。您可以编辑此文件以使用不同的 Kinesis 流名称或区域。

```
[
  {
    "PropertyGroupId": "InputStream0",
    "PropertyMap": {
      "stream.name": "ExampleInputStream",
      "flink.stream.initpos": "LATEST",
      "aws.region": "us-east-1"
    }
  },
  {
    "PropertyGroupId": "OutputStream0",
    "PropertyMap": {
      "stream.name": "ExampleOutputStream",
      "aws.region": "us-east-1"
    }
  }
]
```

### 在本地运行 Python 应用程序
<a name="gs-python-run-locally"></a>

可以在本地运行应用程序，可以从命令行中作为常规 Python 脚本运行，也可以从 IDE 中运行。

**要运行应用程序，请从命令行运行**

1. 确保独立 Python 环境（例如 Conda 或你安装了 Python Flink 库 VirtualEnv 的地方）当前处于活动状态。

1. 确保至少运行一次 `mvn package`。

1. 设置 `IS_LOCAL = true` 环境变量：

   ```
   $ export IS_LOCAL=true
   ```

1. 将该应用程序作为常规 Python 脚本运行。

   ```
   $python main.py
   ```

**从 IDE 中运行应用程序**

1. 使用以下配置将 IDE 配置为运行 `main.py` 脚本：

   1. 使用独立的 Python 环境，例如 Conda 或你安装 PyFlink 库 VirtualEnv 的地方。

   1. 使用 AWS 凭据访问输入和输出 Kinesis 数据流。

   1. 设置 `IS_LOCAL = true`。

1. 设置运行配置的确切过程取决于您的 IDE，并且会有所不同。

1. 设置 IDE 后，运行 Python 脚本，并在应用程序运行时使用 IDE 提供的工具。

### 在本地检查应用程序日志
<a name="gs-python-run-IDE"></a>

在本地运行时，除了在应用程序启动时打印和显示的几行之外，应用程序不会在控制台中显示任何日志。 PyFlink 将日志写入安装了 Python Flink 库的目录中的一个文件中。应用程序启动时会打印日志的位置。还可以运行以下命令来查找日志：

```
$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
```

1. 列出日志目录中的文件。通常会找到单个的 `.log` 文件。

1. 在应用程序运行时追踪文件：`tail -f <log-path>/<log-file>.log`。

## 观察 Kinesis 流中的输入和输出数据
<a name="gs-python-observe-input-output"></a>

您可以使用 Amazon Kinesis 控制台中的**数据查看器**观察由（生成示例 Python）或 Kinesis Data Generator（链接）发送到输入流的记录。

**要观察记录，请执行以下操作：**  [在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)  验证区域是否与运行本教程的区域相同，默认为 us-east-1 美国东部（弗吉尼亚州北部）。如果区域不匹配，请更改区域。   选择**数据流**。   选择您要观察的流，即 `ExampleInputStream` 或 `ExampleOutputStream.`   选择**数据查看器**选项卡。   选择任意**分片**，保持**最新**作为**起始位置**，然后选择**获取记录**。您可能会看到“未找到该请求的记录”错误。如果看到此错误，请选择**重试获取记录**。发布到流显示的最新记录。   在“数据”列中选择值以检查 JSON 格式的记录内容。   

## 停止在本地运行的应用程序
<a name="gs-python-stop"></a>

停止在 IDE 中运行的应用程序。IDE 通常会提供“停止”选项。确切的位置和方法取决于 IDE。

## 打包应用程序代码
<a name="gs-python-package-code"></a>

在本节中，您会使用 Apache Maven 将应用程序代码和所有必需的依赖项打包到一个 .zip 文件中。

再次运行 Maven 打包命令：

```
$ mvn package
```

此命令会生成文件 `target/managed-flink-pyflink-getting-started-1.0.0.zip`。

## 将应用程序包上传到 Amazon S3 存储桶
<a name="gs-python-upload-bucket"></a>

在本节中，您要将在上一节中创建的 .zip 文件上传到在本教程开始时创建的 Amazon Simple Storage Service（Amazon S3）存储桶中。如果您尚未完成此步骤，请参阅（链接）。

**上传应用程序代码 JAR 文件**

1. 打开 Amazon S3 控制台，网址为 [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)。

1. 选择您之前为应用程序代码创建的存储桶。

1. 选择**上传**。

1. 选择**添加文件**。

1. 导航到上一步中生成的 .zip 文件：`target/managed-flink-pyflink-getting-started-1.0.0.zip`。

1. 在不更改任何其他设置的情况下选择**上传**。

## 创建并配置 Managed Service for Apache Flink 应用程序
<a name="gs-python-7"></a>

您可以使用控制台或 AWS CLI创建和配置 Managed Service for Apache Flink 应用程序。在本教程中，我们将使用控制台。

### 创建应用程序
<a name="gs-python-7-console-create"></a>

1. 登录并通过 /f AWS 管理控制台 link 打开亚马逊 MSF 控制台。 https://console.aws.amazon.com

1. 确认选择正确的区域：美国东部（弗吉尼亚北部）us-east-1。

1. 打开右侧的菜单，选择 **Apache Flink 应用程序**，然后选择**创建流应用程序**。或者，从初始页面的**入门**部分中选择**创建流应用程序**。

1. 在**创建流应用程序**页面上：
   + 在**选择设置流处理应用程序的方法**中，选择**从头开始创建**。
   + 对于 **Apache Flink 配置，应用程序 Flink 版本**，请选择 **Apache Flink 1.19**。
   + 对于**应用程序配置**：
     + 对于 **应用程序名称 **，输入 **MyApplication**。
     + 对于**描述**，输入 **My Python test app**。
     + 在**访问应用程序资源**中，选择使用所需策略**创建/更新 IAM 角色 kinesis-analytics-MyApplication-us-east-1**。
   + 对于**应用程序模板设置**：
     + 对于**模板**，选择**开发**。
   + 选择**创建流应用程序**。

**注意**  
在使用控制台创建应用程序的 Managed Service for Apache Flink时，您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的，如下所示：  
策略：`kinesis-analytics-service-MyApplication-us-west-2`
角色：`kinesisanalytics-MyApplication-us-west-2`
Amazon Managed Service for Apache Flink 之前称为 *Kinesis Data Analytics*。为实现向后兼容，自动创建的资源名称前缀为 `kinesis-analytics`。

### 编辑 IAM 策略
<a name="gs-python-7-console-iam"></a>

编辑 IAM policy 以添加访问 Amazon S3 数据流的权限。

**编辑 IAM policy 以添加 S3 存储桶权限**

1. 使用 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 打开 IAM 控制台。

1. 选择**策略**。选择控制台在上一部分中为您创建的 **`kinesis-analytics-service-MyApplication-us-east-1`** 策略。

1. 选择**编辑**，然后选择 **JSON** 选项卡。

1. 将以下策略示例中突出显示的部分添加到策略中。将示例账户 IDs (*012345678901*) 替换为您的账户 ID。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

1. 选择**下一步**，然后选择**保存更改**。

### 配置应用程序
<a name="gs-python-7-console-configure"></a>

编辑应用程序配置以设置应用程序代码构件。

**配置应用程序**

1. 在**MyApplication**页面上，选择**配置**。

1. 在**应用程序代码位置**部分中：
   + 对于 **Amazon S3 存储桶**，请选择之前为应用程序代码创建的存储桶。选择**浏览**并选择正确的存储桶，然后选择**选择**。请勿在存储桶名称上选择。
   + 在 **Amazon S3 对象的路径**中，输入 **managed-flink-pyflink-getting-started-1.0.0.zip**。

1. 对于**访问权限**，请选择**创建/更新具有必要策略的 IAM 角色 `kinesis-analytics-MyApplication-us-east-1`**。

1. 移至**运行时属性**，并保留所有其他设置的默认值。

1. 选择**添加新项目**并添加以下每个参数：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/gs-python-createapp.html)

1. 请勿修改任何其他部分，然后选择**保存更改**。

**注意**  
当您选择启用 Amazon CloudWatch 日志时，适用于 Apache Flink 的托管服务会为您创建一个日志组和日志流。这些资源的名称如下所示：  
日志组：`/aws/kinesis-analytics/MyApplication`
日志流：`kinesis-analytics-log-stream`

### 运行应用程序
<a name="gs-python-7-console-run"></a>

应用程序现已完成配置，准备好运行。

**运行应用程序**

1. 在 Amazon Managed Service for Apache Flink 的控制台上，选择**我的应用程序**，然后选择**运行**。

1. 在下一页的“应用程序还原配置”页面上，选择**使用最新快照运行**，然后选择**运行**。

   **应用程序详细信息**中的**状态**会从 `Ready` 转换到 `Starting`，然后在应用程序启动时转换到 `Running`。

当应用程序处于 `Running` 状态时，您现在可以打开 Flink 控制面板。

**打开控制面板**

1. 选择**打开 Apache Flink 控制面板**。控制面板将在新页面上打开。

1. 在**正在运行的作业**列表中，选择您可以看到的单个作业。
**注意**  
如果您错误设置运行时属性或编辑 IAM 策略，则应用程序状态可能会变为 `Running`，但是 Flink 控制面板显示任务正在持续重新启动。如果应用程序配置错误或缺乏访问外部资源的权限，则通常会出现这种故障场景。  
发生这种情况时，请检查 Flink 控制面板中的**异常**选项卡以查看问题的原因。

### 观察运行中应用程序的指标
<a name="gs-python-observe-metrics"></a>

在该**MyApplication**页面的 **Amazon CloudWatch 指标**部分，您可以看到正在运行的应用程序中的一些基本指标。

**查看指标**

1. 在**刷新**按钮旁边，从下拉列表中选择 **10 秒**。

1. 当应用程序运行且运行状况良好时，您可以看到**正常运行时间**指标不断增加。

1. **完全重新启动**指标应为零。如果该指标增加，则配置可能存在问题。要调查问题，请审查 Flink 控制面板上的**异常**选项卡。

1. 在运行状况良好的应用程序中，**失败的检查点数**指标应为零。
**注意**  
此控制面板显示一组固定的指标，且粒度为 5 分钟。您可以使用仪表板中的任何指标创建自定义应用程序 CloudWatch 控制面板。

### 观察 Kinesis 流中的输出数据
<a name="gs-python-observe-output"></a>

确保您仍在使用 Python 脚本或 Kinesis Data Generator 将数据发布到输入中。

现在，您可以使用中的数据查看器来观察在 Apache Flink 托管服务上运行的应用程序的输出 [https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/)，就像之前所做的那样。

**查看输出**

1. [在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 确认该区域与您运行本教程时使用的区域相同。默认情况下，区域为 US-East-1US 东部（弗吉尼亚北部）。如有必要，可以更改区域。

1. 选择**数据流**。

1. 选择您要观察的流。在本教程中，请使用 `ExampleOutputStream`。

1.  选择**数据查看器**选项卡。

1. 选择任意**分片**，保持**最新**作为**起始位置**，然后选择**获取记录**。您可能会看到“未找到该请求的记录”错误。如果看到此错误，请选择**重试获取记录**。发布到流显示的最新记录。

1. 在“数据”列中选择值以检查 JSON 格式的记录内容。

### 停止应用程序
<a name="gs-python-7-console-stop"></a>

要停止应用程序，请转至名为 `MyApplication` 的 Managed Service for Apache Flink 应用程序的控制台页面。

**停止应用程序**

1. 从**操作**下拉列表中，选择**停止**。

1. **应用程序详细信息**中的**状态**会从 `Running` 转换到 `Stopping`，然后在应用程序完全停止时转换到 `Ready`。
**注意**  
请勿忘记还要停止从 Python 脚本或 Kinesis Data Generator 向输入流发送数据。

## 后续步骤
<a name="gs-python-next-step-4"></a>

[清理 AWS 资源](gs-python-cleanup.md)