

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 开始使用适用于 Apache Flink 的亚马逊托管服务 (DataStream API)
<a name="getting-started"></a>

本节向您介绍适用于 Apache Flink 的托管服务的基本概念，以及使用 API 在 Java 中实现应用程序。 DataStream 它介绍了可用于创建和测试应用程序的选项。它还提供了相应的说明以安装所需的工具，以完成本指南中的教程和创建第一个应用程序。

**Topics**
+ [审核 Managed Service for Apache Flink 应用程序组件](#getting-started-components)
+ [满足完成练习的先决条件](#setting-up-prerequisites)
+ [设置 AWS 账户并创建管理员用户](setting-up.md)
+ [设置 AWS Command Line Interface (AWS CLI)](setup-awscli.md)
+ [创建并运行适用于 Apache Flink 的托管服务应用程序](get-started-exercise.md)
+ [清理 AWS 资源](getting-started-cleanup.md)
+ [探索其他资源](getting-started-next-steps.md)

## 审核 Managed Service for Apache Flink 应用程序组件
<a name="getting-started-components"></a>

**注意**  
适用于 Apache Flink 的亚马逊托管服务 Flink 支持所有 Apache Flink，可能还支持所有 J APIs VM 语言。有关更多信息，请参阅 [Flink 的。 APIs](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/concepts/overview/#flinks-apis)  
根据您选择的 API，应用程序的结构和实施会略有不同。本入门教程介绍如何在 Java 中使用 DataStream API 实现应用程序。

为处理数据，Managed Service for Apache Flink 应用程序使用 Java 应用程序，其使用 Apache Flink 运行时系统处理输入和生成输出。

典型的 Managed Service for Apache Flink 应用程序包含以下组件：
+ **运行时属性：**您可以使用*运行时属性*将配置参数传递给应用程序，以便在不修改和重新发布代码的情况下对其进行更改。
+ **源：**应用程序使用来自一个或多个*源*的数据。源使用[连接器](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/table/overview/)从外部系统（例如 Kinesis 数据流或 Kafka 存储桶）读取数据。有关更多信息，请参阅 [添加流数据源](how-sources.md)。
+ **运算符：**应用程序使用一个或多个*运算符* 以处理数据。运算符可以转换、丰富或聚合数据。有关更多信息，请参阅 [运算符](how-operators.md)。
+ **接收器：**应用程序通过*接收器*将数据发送到外部源。接收器使用[连接器](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/table/overview/)将数据发送到 Kinesis 数据流、Kifka 主题、Amazon S3 或关系数据库。也可以使用特殊的连接器，仅出于开发目的打印输出。有关更多信息，请参阅 [使用接收器写入数据](how-sinks.md)。

您的应用程序需要一些*外部依赖项*，例如应用程序使用的 Flink 连接器或潜在的 Java 库。要在 Amazon Managed Service for Apache Flink 中运行，必须将应用程序与依赖项一起打包到 *fat-jar* 中，然后上传到 Amazon S3 存储桶。然后，您创建一个 Managed Service for Apache Flink 应用程序。您可以传递代码包的位置以及任何其他运行时配置参数。

本教程演示如何使用 Apache Maven 打包应用程序，以及如何在您选择的 IDE 中本地运行应用程序。

## 满足完成练习的先决条件
<a name="setting-up-prerequisites"></a>

要完成本指南中的步骤，您必须满足以下条件：
+ [Git 客户端](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)。如果尚未安装 Git 客户端，请安装它。
+ [Java 开发套件 (JDK) 版本 11](https://www.oracle.com/java/technologies/downloads/#java11)。安装 Java JDK 11 并设置 `JAVA_HOME` 环境变量，使其指向您的 JDK 安装位置。如果没有 JDK 11，可以使用 [Amazon Coretto 11](https://docs.aws.amazon.com/corretto/latest/corretto-11-ug/what-is-corretto-11.html) 或选择的任何其他标准 JDK。
  + 要验证是否已正确安装 SDK，请运行以下命令。如果使用 Amazon Corretto 以外的 JDK，则输出会有所不同。确保版本为 11.x。

    ```
    $ java --version
    
    openjdk 11.0.23 2024-04-16 LTS
    OpenJDK Runtime Environment Corretto-11.0.23.9.1 (build 11.0.23+9-LTS)
    OpenJDK 64-Bit Server VM Corretto-11.0.23.9.1 (build 11.0.23+9-LTS, mixed mode)
    ```
+ [Apache Maven](https://maven.apache.org/)。如果尚未安装 Apache Maven，请安装它。要了解如何安装该工具，请参阅[安装 Apache Maven](https://maven.apache.org/install.html)。
  + 要测试您的 Apache Maven 安装，请输入以下内容：

  ```
  $ mvn -version
  ```
+ 用于本地开发的 IDE。我们建议您使用开发环境（如 [Eclipse Java Neon](https://www.eclipse.org/downloads/packages/release/neon/3) 或 [IntelliJ IDEA](https://www.jetbrains.com/idea/)）来开发和编译您的应用程序。
  + 要测试您的 Apache Maven 安装，请输入以下内容：

  ```
  $ mvn -version
  ```

要开始，请转到[设置 AWS 账户并创建管理员用户](setting-up.md)。

# 设置 AWS 账户并创建管理员用户
<a name="setting-up"></a>

首次使用 Managed Service for Apache Flink 之前，请完成以下任务：

## 注册获取 AWS 账户
<a name="sign-up-for-aws"></a>

如果您没有 AWS 账户，请完成以下步骤来创建一个。

**要注册 AWS 账户**

1. 打开[https://portal.aws.amazon.com/billing/注册。](https://portal.aws.amazon.com/billing/signup)

1. 按照屏幕上的说明操作。

   在注册时，将接到电话或收到短信，要求使用电话键盘输入一个验证码。

   当您注册时 AWS 账户，就会创建*AWS 账户根用户*一个。根用户有权访问该账户中的所有 AWS 服务 和资源。作为最佳安全实践，请为用户分配管理访问权限，并且只使用根用户来执行[需要根用户访问权限的任务](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_root-user.html#root-user-tasks)。

AWS 注册过程完成后会向您发送一封确认电子邮件。您可以随时前往 [https://aws.amazon.com/](https://aws.amazon.com/)并选择 “**我的账户”，查看您当前的账户活动并管理您的账户**。

## 创建具有管理访问权限的用户
<a name="create-an-admin"></a>

注册后，请保护您的安全 AWS 账户 AWS 账户根用户 AWS IAM Identity Center，启用并创建管理用户，这样您就不会使用 root 用户执行日常任务。

**保护你的 AWS 账户根用户**

1.  选择 **Root 用户**并输入您的 AWS 账户 电子邮件地址，以账户所有者的身份登录。[AWS 管理控制台](https://console.aws.amazon.com/)在下一页上，输入您的密码。

   要获取使用根用户登录方面的帮助，请参阅《AWS 登录 用户指南》**中的 [Signing in as the root user](https://docs.aws.amazon.com/signin/latest/userguide/console-sign-in-tutorials.html#introduction-to-root-user-sign-in-tutorial)。

1. 为您的根用户启用多重身份验证（MFA）。

   有关说明，请参阅 I [A *M* 用户指南中的为 AWS 账户 根用户启用虚拟 MFA 设备（控制台）](https://docs.aws.amazon.com/IAM/latest/UserGuide/enable-virt-mfa-for-root.html)。

**创建具有管理访问权限的用户**

1. 启用 IAM Identity Center。

   有关说明，请参阅**《AWS IAM Identity Center 用户指南》中的[启用 AWS IAM Identity Center](https://docs.aws.amazon.com//singlesignon/latest/userguide/get-set-up-for-idc.html)。

1. 在 IAM Identity Center 中，为用户授予管理访问权限。

   有关使用 IAM Identity Center 目录 作为身份源的教程，请参阅《[用户*指南》 IAM Identity Center 目录中的使用默认设置配置AWS IAM Identity Center 用户*访问权限](https://docs.aws.amazon.com//singlesignon/latest/userguide/quick-start-default-idc.html)。

**以具有管理访问权限的用户身份登录**
+ 要使用您的 IAM Identity Center 用户身份登录，请使用您在创建 IAM Identity Center 用户时发送到您的电子邮件地址的登录 URL。

  有关使用 IAM Identity Center 用户[登录的帮助，请参阅*AWS 登录 用户指南*中的登录 AWS 访问门户](https://docs.aws.amazon.com/signin/latest/userguide/iam-id-center-sign-in-tutorial.html)。

**将访问权限分配给其他用户**

1. 在 IAM Identity Center 中，创建一个权限集，该权限集遵循应用最低权限的最佳做法。

   有关说明，请参阅《AWS IAM Identity Center 用户指南》**中的 [Create a permission set](https://docs.aws.amazon.com//singlesignon/latest/userguide/get-started-create-a-permission-set.html)。

1. 将用户分配到一个组，然后为该组分配单点登录访问权限。

   有关说明，请参阅《AWS IAM Identity Center 用户指南》**中的 [Add groups](https://docs.aws.amazon.com//singlesignon/latest/userguide/addgroups.html)。

## 授权以编程方式访问
<a name="setting-up-access"></a>

如果用户想在 AWS 外部进行交互，则需要编程访问权限 AWS 管理控制台。授予编程访问权限的方式取决于正在访问的用户类型 AWS。

要向用户授予编程式访问权限，请选择以下选项之一。


****  

| 哪个用户需要编程式访问权限？ | 目的 | 方式 | 
| --- | --- | --- | 
| IAM | （推荐）使用控制台凭证作为临时凭证，签署对 AWS CLI AWS SDKs、或的编程请求 AWS APIs。 |  按照您希望使用的界面的说明进行操作。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/setting-up.html)  | 
|  人力身份 （在 IAM Identity Center 中管理的用户）  | 使用临时证书签署向 AWS CLI AWS SDKs、或发出的编程请求 AWS APIs。 |  按照您希望使用的界面的说明进行操作。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/setting-up.html)  | 
| IAM | 使用临时证书签署向 AWS CLI AWS SDKs、或发出的编程请求 AWS APIs。 | 按照 IAM 用户指南中的将[临时证书与 AWS 资源配合使用](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_use-resources.html)中的说明进行操作。 | 
| IAM | （不推荐使用）使用长期凭证签署向 AWS CLI AWS SDKs、或发出的编程请求 AWS APIs。 |  按照您希望使用的界面的说明进行操作。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/setting-up.html)  | 

## 下一个步骤
<a name="setting-up-next-step-2"></a>

[设置 AWS Command Line Interface (AWS CLI)](setup-awscli.md)

# 设置 AWS Command Line Interface (AWS CLI)
<a name="setup-awscli"></a>

在此步骤中，您将下载并配置为与适用于 Apache Flink 的托管服务一起使用。 AWS CLI 

**注意**  
本指南中的入门练习假定您使用账户中的管理员凭证 (`adminuser`) 来执行这些操作。

**注意**  
如果您已经 AWS CLI 安装了，则可能需要升级才能获得最新功能。有关更多信息，请参阅AWS Command Line Interface 《用户指南》**中的[安装 AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/installing.html)。要检查的版本 AWS CLI，请运行以下命令：  

```
aws --version
```
本教程中的练习需要以下 AWS CLI 版本或更高版本：  

```
aws-cli/1.16.63
```

**要设置 AWS CLI**

1. 下载并配置 AWS CLI。有关说明，请参阅《AWS Command Line Interface 用户指南》**中的以下主题：
   + [安装 AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-set-up.html)
   + [配置 AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html)

1. 在文件中为管理员用户添加命名的配置 AWS CLI `config`文件。在执行 AWS CLI 命令时，您将使用此配置文件。有关命名配置文件的更多信息，请参阅 *AWS Command Line Interface 用户指南*中的[命名配置文件](https://docs.aws.amazon.com/cli/latest/userguide/cli-multiple-profiles.html)。

   ```
   [profile adminuser]
   aws_access_key_id = adminuser access key ID
   aws_secret_access_key = adminuser secret access key
   region = aws-region
   ```

   有关可用 AWS 区域的列表，请参阅中的[区域和终端节点*Amazon Web Services 一般参考*](https://docs.aws.amazon.com/general/latest/gr/rande.html)。
**注意**  
本教程中的示例代码和命令使用 us-east-1 美国东部（弗吉尼亚州北部）区域。要使用不同的区域，请将本教程的代码和命令中的区域更改为要使用的区域。

1. 在命令提示符处输入以下帮助命令来验证设置：

   ```
   aws help
   ```

设置 AWS 帐户和之后 AWS CLI，您可以尝试下一个练习，即配置示例应用程序并测试 end-to-end设置。

## 后续步骤
<a name="setup-awscli-next-step-3"></a>

[创建并运行适用于 Apache Flink 的托管服务应用程序](get-started-exercise.md)

# 创建并运行适用于 Apache Flink 的托管服务应用程序
<a name="get-started-exercise"></a>

在此步骤中，您将创建 Managed Service for Apache Flink 应用程序，并将 Kinesis 数据流作为源和接收器。

**Topics**
+ [创建相关资源](#get-started-exercise-0)
+ [设置本地开发环境](#get-started-exercise-2)
+ [下载并检查 Apache Flink 流式处理 Java 代码](#get-started-exercise-5)
+ [将示例记录写入输入流](#get-started-exercise-5-4)
+ [在本地运行应用程序](#get-started-exercise-5-run)
+ [观察 Kinesis 流中的输入和输出数据](#get-started-exercise-input-output)
+ [停止在本地运行的应用程序](#get-started-exercise-stop)
+ [编译并打包您的应用程序代码](#get-started-exercise-5-5)
+ [上传应用程序代码 JAR 文件](#get-started-exercise-6)
+ [创建并配置 Managed Service for Apache Flink 应用程序](#get-started-exercise-7)
+ [后续步骤](#get-started-exercise-next-step-4)

## 创建相关资源
<a name="get-started-exercise-0"></a>

在本练习中，创建Managed Service for Apache Flink的应用程序之前，您需要创建以下从属资源：
+ 两个 Kinesis 流用于输入和输出
+ 存储应用程序代码的 Amazon S3 存储桶
**注意**  
本教程假设您在 us-east-1 美国东部（弗吉尼亚州北部）区域部署应用程序。如果您使用其他区域，请相应地调整所有步骤。

### 创建两个 Amazon Kinesis 数据流
<a name="get-started-exercise-1"></a>

在为本练习创建 Managed Service for Apache Flink 应用程序之前，请创建两个 Kinesis 数据流（`ExampleInputStream`和`ExampleOutputStream`）。您的应用程序将这些数据流用于应用程序源和目标流。

您可以使用 Amazon Kinesis 控制台或以下 AWS CLI 命令创建这些直播。有关控制台说明，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[创建和更新数据流](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)。要使用创建直播 AWS CLI，请使用以下命令，根据您用于应用程序的区域进行调整。

**创建数据流 (AWS CLI)**

1. 要创建第一个直播 (`ExampleInputStream`)，请使用以下 Amazon Kinesis 命令 `create-stream` AWS CLI ：

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-east-1 \
   ```

1. 要创建应用程序用来写入输出的第二个流，请运行同一命令（将流名称更改为 `ExampleOutputStream`）：

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-east-1 \
   ```

### 为应用程序代码创建一个 Amazon S3 存储桶
<a name="get-started-exercise-1-5"></a>

您可以使用控制台来创建 Amazon S3 存储桶。要了解如何使用控制台创建 Amazon S3 存储桶，请参阅 [Amazon S3 用户指南](https://docs.aws.amazon.com/AmazonS3/latest/userguide/)中的[创建存储桶](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html)。使用全局唯一名称命名 Amazon S3 存储桶，例如通过附加登录名。

**注意**  
 请确保在用于本教程的区域（us-east-1）中创建存储桶。

### 其他 资源
<a name="get-started-exercise-1-6"></a>

在您创建应用程序时，适用于 Apache Flink 的托管服务会自动创建以下 Amazon CloudWatch 资源（如果这些资源尚不存在）：
+ 名为 `/AWS/KinesisAnalytics-java/<my-application>` 的日志组
+ 名为 `kinesis-analytics-log-stream` 的日志流

## 设置本地开发环境
<a name="get-started-exercise-2"></a>

对于开发和调试，您可以直接从所选的 IDE 在计算机上运行 Apache Flink 应用程序。任何 Apache Flink 依赖项都像使用 Apache Maven 的常规 Java 依赖项一样进行处理。

**注意**  
在开发计算机上，必须安装 Java JDK 11、Maven 和 Git。我们建议您使用开发环境（如 [Eclipse Java Neon](https://www.eclipse.org/downloads/packages/release/neon/3) 或 [IntelliJ IDEA](https://www.jetbrains.com/idea/)）。要验证您是否满足所有先决条件，请参阅 [满足完成练习的先决条件](getting-started.md#setting-up-prerequisites)。您**无**需在计算机上安装 Apache Flink 集群。

### 对您的 AWS 会话进行身份验证
<a name="get-started-exercise-2-5"></a>

该应用程序使用 Kinesis 数据流来发布数据。在本地运行时，您必须拥有有效的 AWS 经过身份验证的会话，并具有写入 Kinesis 数据流的权限。按照以下步骤对会话进行身份验证：

1. 如果您没有配置带有有效凭据 AWS CLI 的命名配置文件，请参阅[设置 AWS Command Line Interface (AWS CLI)](setup-awscli.md)。

1. 通过发布以下测试记录，验证您的配置 AWS CLI 是否正确，并且您的用户有权写入 Kinesis 数据流：

   ```
   $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
   ```

1. 如果您的 IDE 有要集成的插件 AWS，则可以使用该插件将凭据传递给 IDE 中运行的应用程序。有关更多信息，请参阅[适用于 IntelliJ IDEA 的AWS Toolkit](https://aws.amazon.com/intellij/) 和[适用于 Eclipse 的AWS Toolkit](https://docs.aws.amazon.com/toolkit-for-eclipse/v1/user-guide/welcome.html)。

## 下载并检查 Apache Flink 流式处理 Java 代码
<a name="get-started-exercise-5"></a>

此示例的 Java 应用程序代码可从中获得 GitHub。要下载应用程序代码，请执行以下操作：

1. 使用以下命令克隆远程存储库：

   ```
   git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
   ```

1. 导航到 `amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted` 目录。

### 审核应用程序组件
<a name="get-started-exercise-5-1"></a>

该应用程序完全在 `com.amazonaws.services.msf.BasicStreamingJob` 类中实施。`main()` 方法定义用于处理流数据的数据流程并运行它。

**注意**  
为优化开发人员体验，该应用程序设计为无需更改任何代码即可同时在 Amazon Managed Service for Apache Flink 上和本地运行，以便在您的 IDE 中进行开发。
+ 要读取运行时配置，使其在 Amazon Managed Service for Apache Flink 和 IDE 中能够正常运行，应用程序会自动检测它是否在 IDE 中本地独立运行。在这种情况下，应用程序加载运行时配置的方式会有所不同：

  1. 当应用程序检测到其在 IDE 中以独立模式运行时，会形成包含在项目 **resources** 文件夹中的 `application_properties.json` 文件。该文件的内容如下所示。

  1. 当应用程序在 Amazon Managed Service for Apache Flink 中运行时，默认行为会根据您将在 Amazon Managed Service for Apache Flink 应用程序中定义的运行时属性加载应用程序配置。请参阅[创建并配置 Managed Service for Apache Flink 应用程序](#get-started-exercise-7)。

     ```
     private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
         if (env instanceof LocalStreamEnvironment) {
             LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE);
             return KinesisAnalyticsRuntime.getApplicationProperties(
                     BasicStreamingJob.class.getClassLoader()
                             .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath());
         } else {
             LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink");
             return KinesisAnalyticsRuntime.getApplicationProperties();
         }
     }
     ```
+ `main()` 方法定义应用程序数据流程并运行它。
  + 初始化默认的流环境。在此示例中，我们展示了如何创建`StreamExecutionEnvironment`要用于 DataSteam API 的，以及`StreamTableEnvironment`要用于 SQL 和表 API 的。这两个环境对象是对同一个运行时环境的两个单独引用，用法不同 APIs。

    ```
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    ```
  + 加载应用程序配置参数。这将自动从正确的位置加载这些参数，具体取决于应用程序的运行位置：

    ```
    Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    ```
  + 该应用程序使用 [Kinesis Consumer](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/#kinesis-consumer) 连接器定义一个源，用于从输入流中读取数据。输入流的配置以 `PropertyGroupId`=`InputStream0` 的方式定义。流的名称和区域分别位于名为 `stream.name` 和 `aws.region` 的属性中。为简单起见，此源将记录读取为字符串。

    ```
    private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) {
        String inputStreamName = inputProperties.getProperty("stream.name");
        return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties);
    }
    ...
    
    public static void main(String[] args) throws Exception { 
       ...
       SourceFunction<String> source = createSource(applicationParameters.get("InputStream0"));
       DataStream<String> input = env.addSource(source, "Kinesis Source");  
       ...
    }
    ```
  + 然后，应用程序使用 [Kinesis Streams Sink](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/#kinesis-streams-sink) 连接器定义接收器，以将数据发送到输出流。与输入流类似，输出流的名称和区域以 `PropertyGroupId`=`OutputStream0` 的方式定义。接收器直接连接到从源获取数据的内部 `DataStream`。在实际应用程序中，需要在源和接收器之间进行一些转换。

    ```
    private static KinesisStreamsSink<String> createSink(Properties outputProperties) {
        String outputStreamName = outputProperties.getProperty("stream.name");
        return KinesisStreamsSink.<String>builder()
                .setKinesisClientProperties(outputProperties)
                .setSerializationSchema(new SimpleStringSchema())
                .setStreamName(outputStreamName)
                .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))
                .build();
    }
    ...
    public static void main(String[] args) throws Exception { 
       ...
       Sink<String> sink = createSink(applicationParameters.get("OutputStream0"));
       input.sinkTo(sink);
       ...
    }
    ```
  + 最后，运行刚才定义的数据流程。定义数据流程所需的所有运算符之后，这必须是 `main()` 方法的最后一条指令：

    ```
    env.execute("Flink streaming Java API skeleton");
    ```

### 使用 pom.xml 文件
<a name="get-started-exercise-5-2"></a>

pom.xml 文件定义应用程序所需的所有依赖项，并且设置 Maven Shade 插件来构建包含 Flink 所需的所有依赖项的 fat-jar。
+ 有些依赖项具有 `provided` 范围。当应用程序在 Amazon Managed Service for Apache Flink 中运行时，这些依赖项自动可用。它们是编译应用程序或在 IDE 中本地运行应用程序所必需的依赖项。有关更多信息，请参阅 [在本地运行应用程序](#get-started-exercise-5-run)。请确保使用的 Flink 版本与 Amazon Managed Service for Apache Flink 中使用的运行时版本相同。

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  ```
+ 必须使用默认范围向 pom 添加其他 Apache Flink 依赖项，例如此应用程序使用的 [Kinesis 连接器](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kinesis/)。有关更多信息，请参阅 [使用 Apache Flink 连接器](how-flink-connectors.md)。您还可以添加应用程序所需的任何其他 Java 依赖项。

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kinesis</artifactId>
      <version>${aws.connector.version}</version>
  </dependency>
  ```
+ Maven Java 编译器插件确保根据 Java 11 编译代码，Java 11 是 Apache Flink 目前支持的 JDK 版本。
+ Maven Shade 插件打包 fat-jar，但不包括运行时提供的一些库。它还指定两个转换器：`ServicesResourceTransformer` 和 `ManifestResourceTransformer`。后者配置包含启动应用程序的 `main` 方法的类。如果重命名主类，请不要忘记更新此转换器。
+ 

  ```
  <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      ...
          <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
              <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass>
          </transformer>
      ...
  </plugin>
  ```

## 将示例记录写入输入流
<a name="get-started-exercise-5-4"></a>

在本节中，使用 Python 脚本将示例记录写入流，以供应用程序处理。您可以通过两种方式生成示例数据，即使用 Python 脚本或使用 [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator)。

### 使用 Python 脚本生成示例数据
<a name="get-started-exercise-5-4-1"></a>

您可以使用 Python 脚本将示例记录发送到数据流。

**注意**  
要运行此 Python 脚本，必须使用 Python 3.x 并安装[适用于 Python 的AWS SDK（Boto）](https://aws.amazon.com/developer/language/python/)库。

**要开始向 Kinesis 输入流发送测试数据，请执行以下操作：**

1. 从数据生成器[ GitHub 存储库](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/data-generator)下载数据生成器 `stock.py` Python 脚本。

1. 运行 `stock.py` 脚本：

   ```
   $ python stock.py
   ```

在完成本教程的其余部分时，请将脚本保持运行状态。您现在可以运行 Apache Flink 应用程序。

### 使用 Kinesis Data Generator 生成示例数据
<a name="get-started-exercise-5-4-2"></a>

除了使用 Python 脚本之外，还可以使用 [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator)（也以[托管版本](https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html)提供）将随机示例数据发送到流中。Kinesis Data Generator 在浏览器中运行，无需在计算机上安装任何工具。

**要设置和运行 Kinesis Data Generator，请执行以下操作：**

1. 按照 [Kinesis Data Generator 文档](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)中的说明设置该工具的访问权限。您将运行一个用于设置用户和密码的 CloudFormation 模板。

1. 通过模板生成的网址访问 Kinesis 数据生成器。 CloudFormation CloudFormation 模板完成后，您可以在 “**输出**” 选项卡中找到 URL。

1. 配置数据生成器：
   + **区域：**选择您在本教程中使用的区域：us-east-1
   + **流/传输流：**选择应用程序将使用的输入流：`ExampleInputStream`
   + **每秒记录数：**100
   + **记录模板：**复制并粘贴以下模板：

     ```
     {
       "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}},
       "ticker" : "{{random.arrayElement(
             ["AAPL", "AMZN", "MSFT", "INTC", "TBV"]
         )}}",
       "price" : {{random.number(100)}}          
     }
     ```

1. 测试模板：选择**测试模板**并验证生成的记录是否与以下内容类似：

   ```
   { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
   ```

1. 启动数据生成器：选择**选择发送数据**。

Kinesis Data Generator 现在向 `ExampleInputStream` 发送数据。

## 在本地运行应用程序
<a name="get-started-exercise-5-run"></a>

您可以在 IDE 中本地运行和调试 Flink 应用程序。

**注意**  
在继续之前，请确认输入和输出流是否可用。请参阅[创建两个 Amazon Kinesis 数据流](#get-started-exercise-1)。此外，请确认您是否具有从两个流中读取和写入的权限。请参阅[对您的 AWS 会话进行身份验证](#get-started-exercise-2-5)。  
设置本地开发环境需要 Java 11 JDK、Apache Maven 和 IDE 来进行 Java 开发。确认您满足所需的先决条件。请参阅[满足完成练习的先决条件](getting-started.md#setting-up-prerequisites)。

### 将 Java 项目导入您的 IDE
<a name="get-started-exercise-5-run-1"></a>

要开始在 IDE 中使用该应用程序，必须将其作为 Java 项目导入。

您克隆的存储库包含多个示例。每个示例都是一个单独的项目。在本教程中，请将 `./java/GettingStarted` 子目录中的内容导入 IDE。

使用 Maven 将代码作为现有 Java 项目插入。

**注意**  
导入新 Java 项目的确切过程因所使用的 IDE 而异。

### 检查本地应用程序配置
<a name="get-started-exercise-5-run-2"></a>

在本地运行时，应用程序使用 `./src/main/resources` 下项目资源文件夹中的 `application_properties.json` 文件内的配置。您可以编辑此文件以使用不同的 Kinesis 流名称或区域。

```
[
  {
    "PropertyGroupId": "InputStream0",
    "PropertyMap": {
      "stream.name": "ExampleInputStream",
      "flink.stream.initpos": "LATEST",
      "aws.region": "us-east-1"
    }
  },
  {
    "PropertyGroupId": "OutputStream0",
    "PropertyMap": {
      "stream.name": "ExampleOutputStream",
      "aws.region": "us-east-1"
    }
  }
]
```

### 设置 IDE 运行配置
<a name="get-started-exercise-5-run-3"></a>

您可以像运行任何 Java 应用程序一样，通过运行主类 `com.amazonaws.services.msf.BasicStreamingJob` 直接从 IDE 运行和调试 Flink 应用程序。运行应用程序前，必须设置“运行”配置。该设置取决于您使用的 IDE。例如，请参阅 IntelliJ IDEA 文档中的[运行/调试配置](https://www.jetbrains.com/help/idea/run-debug-configuration.html)。具体而言，您必须设置以下内容：

1. **将 `provided` 依赖项添加到类路径中**。这是确保在本地运行时将具有 `provided` 范围的依赖项传递给应用程序所必需的条件。如果不进行此设置，应用程序会立即显示 `class not found` 错误。

1. **将访问 Kinesis 直播的 AWS 凭证传递给应用程序。**最快的方法是使用[适用于 IntelliJ IDEA 的AWS Toolkit](https://aws.amazon.com/intellij/)。在 “运行” 配置中使用此 IDE 插件，可以选择特定的 AWS 配置文件。 AWS 使用此配置文件进行身份验证。您无需直接传递 AWS 凭证。

1. 验证 IDE 是否使用 **JDK 11** 运行应用程序。

### 在 IDE 中运行应用程序
<a name="get-started-exercise-5-run-4"></a>

设置 `BasicStreamingJob` 的“运行”配置后，您可以像常规 Java 应用程序一样运行或调试它。

**注意**  
不能从命令行使用 `java -jar ...` 直接运行 Maven 生成的 fat-jar。此 jar 不包含独立运行应用程序所需的 Flink 核心依赖项。

当应用程序成功启动时，它会记录一些有关独立微型集群和连接器初始化的信息。接下来是 Flink 通常在应用程序启动时发出的许多信息和一些警告日志。

```
13:43:31,405 INFO  com.amazonaws.services.msf.BasicStreamingJob                 [] - Loading application properties from 'flink-application-properties-dev.json'
13:43:31,549 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb.
13:43:31,677 INFO  org.apache.flink.runtime.minicluster.MiniCluster             [] - Starting Flink Mini Cluster
....
```

初始化完成后，应用程序不会再发出任何日志条目。**当数据流动时，不会发出任何日志。**

要验证应用程序是否正确处理数据，您可以检查输入和输出 Kinesis 流，如下一节所述。

**注意**  
 Flink 应用程序的正常行为是不发出有关流动数据的日志。在每条记录上发出日志可能便于调试，但在生产环境中运行时可能会增加大量开销。

## 观察 Kinesis 流中的输入和输出数据
<a name="get-started-exercise-input-output"></a>

您可以使用 Amazon Kinesis 控制台中的**数据查看器**观察由（生成示例 Python）或 Kinesis Data Generator（链接）发送到输入流的记录。

**观察记录**

1. [在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 验证区域是否与运行本教程的区域相同，默认为 us-east-1 美国东部（弗吉尼亚州北部）。如果区域不匹配，请更改区域。

1. 选择**数据流**。

1. 选择您要观察的流，即 `ExampleInputStream` 或 `ExampleOutputStream.`

1. 选择**数据查看器**选项卡。

1. 选择任意**分片**，保持**最新**作为**起始位置**，然后选择**获取记录**。您可能会看到“未找到该请求的记录”错误。如果看到此错误，请选择**重试获取记录**。发布到流显示的最新记录。

1. 在“数据”列中选择值以检查 JSON 格式的记录内容。

## 停止在本地运行的应用程序
<a name="get-started-exercise-stop"></a>

停止在 IDE 中运行的应用程序。IDE 通常会提供“停止”选项。确切的位置和方法取决于您使用的 IDE。

## 编译并打包您的应用程序代码
<a name="get-started-exercise-5-5"></a>

在本节中，您将使用 Apache Maven 编译 Java 代码并将其打包到 JAR 文件中。您可以使用 Maven 命令行工具或 IDE 编译和打包代码。

**要使用 Maven 命令行进行编译和打包，请执行以下操作：**

移至包含 Java GettingStarted 项目的目录并运行以下命令：

```
$ mvn package
```

**要使用 IDE 进行编译和打包，请执行以下操作：**

从 IDE Maven 集成中运行 `mvn package`。

在这两种情况下，都会创建以下 JAR 文件：`target/amazon-msf-java-stream-app-1.0.jar`。

**注意**  
 从 IDE 运行“构建项目”可能无法创建 JAR 文件。

## 上传应用程序代码 JAR 文件
<a name="get-started-exercise-6"></a>

在本节中，您要将在上一节中创建的 JAR 文件上传到在本教程开始时创建的 Amazon Simple Storage Service（Amazon S3）存储桶中。如果您尚未完成此步骤，请参阅（链接）。

**上传应用程序代码 JAR 文件**

1. 打开 Amazon S3 控制台，网址为 [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)。

1. 选择您之前为应用程序代码创建的存储桶。

1. 选择**上传**。

1. 选择**添加文件**。

1. 导航到上一步中生成的 JAR 文件：`target/amazon-msf-java-stream-app-1.0.jar`。

1. 在不更改任何其他设置的情况下选择**上传**。

**警告**  
确保在 `<repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar` 中选择正确的 JAR 文件。  
`target` 目录还包含您无需上传的其他 JAR 文件。

## 创建并配置 Managed Service for Apache Flink 应用程序
<a name="get-started-exercise-7"></a>

您可以使用控制台或 AWS CLI创建和运行适用于 Apache Flink 的托管服务的应用程序。在本教程中，您将使用控制台。

**注意**  
当您使用控制台创建应用程序时，系统会为您创建您的 AWS Identity and Access Management (IAM) 和 A CloudWatch mazon Logs 资源。使用创建应用程序时 AWS CLI，可以单独创建这些资源。

**Topics**
+ [创建应用程序](#get-started-exercise-7-console-create)
+ [编辑 IAM 策略](#get-started-exercise-7-console-iam)
+ [配置应用程序](#get-started-exercise-7-console-configure)
+ [运行应用程序](#get-started-exercise-7-console-run)
+ [观察运行中应用程序的指标](#get-started-exercise-7-console-stop)
+ [观察 Kinesis 流中的输出数据](#get-started-exercise-7-console-output)
+ [停止应用程序](#get-started-exercise-stop)

### 创建应用程序
<a name="get-started-exercise-7-console-create"></a>

**创建应用程序**

1. 登录并通过 /f AWS 管理控制台 link 打开亚马逊 MSF 控制台。 https://console.aws.amazon.com

1. 确认选择正确的区域：us-east-1 美国东部（弗吉尼亚北部）

1. 打开右侧的菜单，选择 **Apache Flink 应用程序**，然后选择**创建流应用程序**。或者，在初始页面的“入门”容器中选择**创建流应用程序**。

1. 在**创建流应用程序**页面上：
   + **选择设置流处理应用程序的方法：**选择**从头开始创建**。
   + **Apache Flink 配置，应用程序 Flink 版本：**选择 **Apache Flink 1.20**。

1. 配置应用程序
   + **应用程序名称：**输入 **MyApplication**。
   + **描述**：输入 **My java test app**。
   + **访问应用程序资源：**选择**使用所需策略创建/更新 IAM 角色 `kinesis-analytics-MyApplication-us-east-1`**。

1. 配置**用于应用程序设置的模板**
   + **模板：**选择**开发**。

1. 选择页面底部的**创建流应用程序**。

**注意**  
在使用控制台创建应用程序的 Managed Service for Apache Flink时，您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的，如下所示：  
策略：`kinesis-analytics-service-MyApplication-us-east-1`
角色：`kinesisanalytics-MyApplication-us-east-1`
Amazon Managed Service for Apache Flink 之前称为 Kinesis Data Analytics。为实现向后兼容，自动创建的资源名称前缀为 `kinesis-analytics-`。

### 编辑 IAM 策略
<a name="get-started-exercise-7-console-iam"></a>

编辑 IAM policy 以添加访问 Kinesis 数据流的权限。

**编辑策略**

1. 使用 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 打开 IAM 控制台。

1. 选择**策略**。选择控制台在上一部分中为您创建的 **`kinesis-analytics-service-MyApplication-us-east-1`** 策略。

1. 选择**编辑**，然后选择 **JSON** 选项卡。

1. 将以下策略示例中突出显示的部分添加到策略中。将示例账户 IDs (*012345678901*) 替换为您的账户 ID。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

1.  选择页面底部的**下一步**，然后选择**保存更改**。

### 配置应用程序
<a name="get-started-exercise-7-console-configure"></a>

编辑应用程序配置以设置应用程序代码构件。

**编辑配置**

1. 在**MyApplication**页面上，选择**配置**。

1. 在**应用程序代码位置**部分中：
   + 对于 **Amazon S3 存储桶**，请选择之前为应用程序代码创建的存储桶。选择**浏览**并选择正确的存储桶，然后选择**选择**。请勿单击存储桶名称。
   + 在 **Amazon S3 对象的路径**中，输入 **amazon-msf-java-stream-app-1.0.jar**。

1. 对于**访问权限**，请选择**创建/更新具有必要策略的 IAM 角色 `kinesis-analytics-MyApplication-us-east-1`**。

1. 在**运行时属性**部分中，添加以下属性。

1. 选择**添加新项目**并添加以下每个参数：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/get-started-exercise.html)

1. 请勿修改任何其他部分。

1. 选择**保存更改**。

**注意**  
当您选择启用 Amazon CloudWatch 日志时，适用于 Apache Flink 的托管服务会为您创建一个日志组和日志流。这些资源的名称如下所示：  
日志组：`/aws/kinesis-analytics/MyApplication`
日志流：`kinesis-analytics-log-stream`

### 运行应用程序
<a name="get-started-exercise-7-console-run"></a>

应用程序现已完成配置，准备好运行。

**运行应用程序**

1. 在 Amazon Managed Service for Apache Flink 的控制台上，选择**我的应用程序**，然后选择**运行**。

1. 在下一页的“应用程序还原配置”页面上，选择**使用最新快照运行**，然后选择**运行**。

   **应用程序详细信息**中的**状态**会从 `Ready` 转换到 `Starting`，然后在应用程序启动时转换到 `Running`。

当应用程序处于 `Running` 状态时，您现在可以打开 Flink 控制面板。

**打开控制面板**

1. 选择**打开 Apache Flink 控制面板**。控制面板将在新页面上打开。

1. 在**正在运行的作业**列表中，选择您可以看到的单个作业。
**注意**  
如果您错误设置运行时属性或编辑 IAM 策略，则应用程序状态可能会变为 `Running`，但是 Flink 控制面板显示任务正在持续重新启动。如果应用程序配置错误或缺乏访问外部资源的权限，则通常会出现这种故障场景。  
发生这种情况时，请检查 Flink 控制面板中的**异常**选项卡以查看问题的原因。

### 观察运行中应用程序的指标
<a name="get-started-exercise-7-console-stop"></a>

在该**MyApplication**页面的 **Amazon CloudWatch 指标**部分，您可以看到正在运行的应用程序中的一些基本指标。

**查看指标**

1. 在**刷新**按钮旁边，从下拉列表中选择 **10 秒**。

1. 当应用程序运行且运行状况良好时，您可以看到**正常运行时间**指标不断增加。

1. **完全重新启动**指标应为零。如果该指标增加，则配置可能存在问题。要调查问题，请审查 Flink 控制面板上的**异常**选项卡。

1. 在运行状况良好的应用程序中，**失败的检查点数**指标应为零。
**注意**  
此控制面板显示一组固定的指标，且粒度为 5 分钟。您可以使用仪表板中的任何指标创建自定义应用程序 CloudWatch 控制面板。

### 观察 Kinesis 流中的输出数据
<a name="get-started-exercise-7-console-output"></a>

确保您仍在使用 Python 脚本或 Kinesis Data Generator 将数据发布到输入中。

现在，您可以使用中的数据查看器来观察在 Apache Flink 托管服务上运行的应用程序的输出 [https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/)，就像之前所做的那样。

**查看输出**

1. [在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 确认该区域与您运行本教程时使用的区域相同。默认情况下，区域为 US-East-1US 东部（弗吉尼亚北部）。如有必要，可以更改区域。

1. 选择**数据流**。

1. 选择您要观察的流。在本教程中，请使用 `ExampleOutputStream`。

1.  选择**数据查看器**选项卡。

1. 选择任意**分片**，保持**最新**作为**起始位置**，然后选择**获取记录**。您可能会看到“未找到该请求的记录”错误。如果看到此错误，请选择**重试获取记录**。发布到流显示的最新记录。

1. 在“数据”列中选择值以检查 JSON 格式的记录内容。

### 停止应用程序
<a name="get-started-exercise-stop"></a>

要停止应用程序，请转至名为 `MyApplication` 的 Managed Service for Apache Flink 应用程序的控制台页面。

**停止应用程序**

1. 从**操作**下拉列表中，选择**停止**。

1. **应用程序详细信息**中的**状态**会从 `Running` 转换到 `Stopping`，然后在应用程序完全停止时转换到 `Ready`。
**注意**  
请勿忘记还要停止从 Python 脚本或 Kinesis Data Generator 向输入流发送数据。

## 后续步骤
<a name="get-started-exercise-next-step-4"></a>

[清理 AWS 资源](getting-started-cleanup.md)

# 清理 AWS 资源
<a name="getting-started-cleanup"></a>

本节包括清理本入门 (DataStream API) 教程中创建的 AWS 资源的过程。

**Topics**
+ [删除 Managed Service for Apache Flink 应用程序](#getting-started-cleanup-app)
+ [删除您的 Kinesis 数据流](#getting-started-cleanup-stream)
+ [删除您的 Amazon S3 对象和存储桶](#getting-started-cleanup-s3)
+ [删除您的 IAM 资源](#getting-started-cleanup-iam)
+ [删除您的 CloudWatch 资源](#getting-started-cleanup-cw)
+ [探索 Apache Flink 的其他资源](#getting-started-cleanup-next-step-5)

## 删除 Managed Service for Apache Flink 应用程序
<a name="getting-started-cleanup-app"></a>

请使用以下过程来删除应用程序。

1. [在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 在 “适用于 Apache Flink 的托管服务” 面板中，选择。**MyApplication**

1. 从**操作**下拉列表中，选择**删除**，然后确认删除。

## 删除您的 Kinesis 数据流
<a name="getting-started-cleanup-stream"></a>

1. 登录并打开亚马逊 MSF 控制台 AWS 管理控制台，网址为 https://console.aws.amazon.com /flink...

1. 选择**数据流**。

1. 选择您创建的两个流：`ExampleInputStream` 和 `ExampleOutputStream`。

1. 从**操作**下拉列表中，选择**删除**，然后确认删除。

## 删除您的 Amazon S3 对象和存储桶
<a name="getting-started-cleanup-s3"></a>

请使用以下过程删除 Amazon S3 对象和存储桶。

**从 S3 存储桶中删除对象**

1. 打开 Amazon S3 控制台，网址为 [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)。

1. 选择您为应用程序构件创建的 S3 存储桶。

1. 选择您上传的名为 `amazon-msf-java-stream-app-1.0.jar` 的应用程序构件。

1. 选择**删除**，然后确认删除。

**删除 S3 存储桶**

1. 打开 Amazon S3 控制台，网址为 [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)。

1. 选择您为构件创建的存储桶。

1. 选择**删除**，然后确认删除。
**注意**  
S3 存储桶必须为空才能将其删除。

## 删除您的 IAM 资源
<a name="getting-started-cleanup-iam"></a>

**删除您的 IAM 资源**

1. 使用 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 打开 IAM 控制台。

1. 在导航栏中，选择**策略**。

1. 在筛选条件控件中，输入 **kinesis**。

1. 选择 **kinesis-analytics-service--us-MyApplication east-1 策略**。

1. 选择 **策略操作**，然后选择 **删除**。

1. 在导航栏中，选择 **角色**。

1. 选择 k **inesis-analytics-us-east-1 角色MyApplication**。

1. 选择 **删除角色**，然后确认删除。

## 删除您的 CloudWatch 资源
<a name="getting-started-cleanup-cw"></a>

1. 打开 CloudWatch 控制台，网址为[https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/)。

1. 在导航栏中，选择 **日志**。

1. 选择**/aws/kinesis-analytics/MyApplication**日志组。

1. 选择 **删除日志组**，然后确认删除。

## 探索 Apache Flink 的其他资源
<a name="getting-started-cleanup-next-step-5"></a>

[探索其他资源](getting-started-next-steps.md)

# 探索其他资源
<a name="getting-started-next-steps"></a>

现在，您已经创建并运行了 Managed Service for Apache Flink 应用程序，请参阅以下资源，了解更多 Managed Service for Apache Flink 解决方案。
+ **[适用于 Apache Flink Workshop 的 Amazon 托管服务](https://catalog.workshops.aws/managed-flink)：**在本研讨会中，您将构建一个 end-to-end流式架构，以近乎实时的方式摄取、分析和可视化流数据。您着手改善纽约市一家出租车公司的运营。您可以近乎实时地分析纽约市出租车队的遥测数据，以优化其车队运营。
+ **[创建和使用 Managed Service for Apache Flink 应用程序的示例](examples-collapsibles.md)：**本开发人员指南的这一部分提供了在 Managed Service for Apache Flink 中创建和使用应用程序的示例。它们包括示例代码和 step-by-step说明，可帮助您为 Apache Flink 应用程序创建托管服务并测试结果。
+ **[学习 Flink：动手训练](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/learn-flink/overview/)：**Apache Flink 官方入门培训，让您开始编写可扩展的流媒体 ETL、分析和事件驱动的应用程序。