

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 创建并运行应用程序 (CLI)
<a name="examples-gs-scala-create-run-cli"></a>

在本节中，您将使用创建和运行适用 AWS Command Line Interface 于 Apache Flink 的托管服务应用程序。使用 k *inesisanalyticsv2* AWS CLI 命令为 Apache Flink 应用程序创建托管服务并与之交互。

## 创建权限策略
<a name="examples-gs-scala-permissions"></a>

**注意**  
您必须为应用程序创建一个权限策略和角色。如果未创建这些 IAM 资源，应用程序将无法访问其数据和日志流。

首先，使用两个语句创建权限策略：一个语句授予对源流执行读取操作的权限，另一个语句授予对接收器流执行写入操作的权限。然后，将策略附加到 IAM 角色（下一部分中将创建此角色）。因此，在适用于 Apache Flink 的托管服务代入该角色时，服务具有必要的权限从源流进行读取和写入接收器流。

使用以下代码创建 `AKReadSourceStreamWriteSinkStream` 权限策略。将 **username** 替换为您用于创建 Amazon S3 存储桶来存储应用程序代码的用户名。将 Amazon 资源名称 (ARNs) 中的账户 ID **(012345678901)** 替换为您的账户 ID。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "ReadCode",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:GetObjectVersion"
            ],
            "Resource": [
                "arn:aws:s3:::ka-app-code-username/getting-started-scala-1.0.jar"
            ]
        },
        {
            "Sid": "DescribeLogGroups",
            "Effect": "Allow",
            "Action": [
                "logs:DescribeLogGroups"
            ],
            "Resource": [
                "arn:aws:logs:us-west-2:123456789012:*"
            ]
        },
        {
            "Sid": "DescribeLogStreams",
            "Effect": "Allow",
            "Action": [
                "logs:DescribeLogStreams"
            ],
            "Resource": [
                "arn:aws:logs:us-west-2:123456789012:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
            ]
        },
        {
            "Sid": "PutLogEvents",
            "Effect": "Allow",
            "Action": [
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:us-west-2:123456789012:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
            ]
        },
        {
            "Sid": "ReadInputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:123456789012:stream/ExampleInputStream"
        },
        {
            "Sid": "WriteOutputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:123456789012:stream/ExampleOutputStream"
        }
    ]
}
```

------

有关创建权限策略的 step-by-step说明，请参阅 *IAM 用户指南*中的[教程：创建并附加您的第一个客户托管策略](https://docs.aws.amazon.com/IAM/latest/UserGuide/tutorial_managed-policies.html#part-two-create-policy)。

## 创建 IAM 策略
<a name="examples-gs-scala-iam-policy"></a>

在本节中，您将创建一个 IAM 角色，应用程序的 Managed Service for Apache Flink可以代入此角色来读取源流和写入接收器流。

权限不足时，Managed Service for Apache Flink 无法访问您的串流。您通过 IAM 角色授予这些权限。每个 IAM 角色附加了两种策略。此信任策略授予适用于 Apache Flink 的托管服务代入该角色的权限，权限策略确定适用于 Apache Flink 的托管服务代入这个角色后可以执行的操作。

您将在上一部分中创建的权限策略附加到此角色。

**创建 IAM 角色**

1. 使用 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 打开 IAM 控制台。

1. 在导航窗格中选择**角色**，然后选择**创建角色**。

1. 在 **选择受信任实体的类型** 下，选择 **AWS 服务**。

1. 在 **选择将使用此角色的服务** 下，选择 **Kinesis**。

1. 在**选择您的用例**部分，选择**Managed Service for Apache Flink**。

1. 选择**下一步: 权限**。

1. 在 **附加权限策略** 页面上，选择 **下一步: 审核**。在创建角色后，您可以附加权限策略。

1. 在 **创建角色** 页面上，输入**MF-stream-rw-role**作为**角色名称**。选择 **创建角色**。

    现在，您已经创建了一个名为 `MF-stream-rw-role` 的新 IAM 角色。接下来，您更新角色的信任和权限策略。

1. 将权限策略附加到角色。
**注意**  
对于本练习，Managed Service for Apache Flink代入此角色，以便同时从 Kinesis 数据流（源）读取数据和将输出写入另一个 Kinesis 数据流。因此，您附加在上一步[创建权限策略](https://docs.aws.amazon.com/managed-flink/latest/java/get-started-exercise.html#get-started-exercise-7-cli-policy)中创建的策略。

   1. 在 **摘要** 页上，选择 **权限** 选项卡。

   1. 选择**附加策略**。

   1. 在搜索框中，输入 **AKReadSourceStreamWriteSinkStream**（您在上一部分中创建的策略）。

   1. 选择`AKReadSourceStreamWriteSinkStream`策略，然后选择**附加策略**。

现在，您已经创建了应用程序用来访问资源的服务执行角色。记下新角色的 ARN。

有关创建角色的 step-by-step说明，请参阅 [IAM *用户指南中的创建 IAM* 角色（控制台）](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user.html#roles-creatingrole-user-console)。

## 创建应用程序
<a name="examples-gs-scala-create-application-cli"></a>

将以下 JSON 代码保存到名为 `create_request.json` 的文件中。将示例角色 ARN 替换为您之前创建的角色的 ARN。将存储桶 ARN 后缀 (用户名) 替换为在前一部分中选择的后缀。将服务执行角色中的示例账户 ID (012345678901) 替换为您的账户 ID。

```
{
    "ApplicationName": "getting_started",
    "ApplicationDescription": "Scala getting started application",
    "RuntimeEnvironment": "FLINK-1_19",
    "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role",
    "ApplicationConfiguration": {
        "ApplicationCodeConfiguration": {
            "CodeContent": {
                "S3ContentLocation": {
                    "BucketARN": "arn:aws:s3:::ka-app-code-username",
                    "FileKey": "getting-started-scala-1.0.jar"
                }
            },
            "CodeContentType": "ZIPFILE"
        },
        "EnvironmentProperties":  { 
         "PropertyGroups": [ 
            { 
               "PropertyGroupId": "ConsumerConfigProperties",
               "PropertyMap" : {
                    "aws.region" : "us-west-2",
                    "stream.name" : "ExampleInputStream",
                    "flink.stream.initpos" : "LATEST"
               }
            },
            { 
               "PropertyGroupId": "ProducerConfigProperties",
               "PropertyMap" : {
                    "aws.region" : "us-west-2",
                    "stream.name" : "ExampleOutputStream"
               }
            }
         ]
      }
    },
    "CloudWatchLoggingOptions": [ 
      { 
         "LogStreamARN": "arn:aws:logs:us-west-2:012345678901:log-group:MyApplication:log-stream:kinesis-analytics-log-stream"
      }
   ]
}
```

[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)使用以下请求执行以创建应用程序：

```
aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
```

应用程序现已创建。您在下一步中启动应用程序。

## 启动应用程序
<a name="examples-gs-scala-start"></a>

在本节中，您使用 [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html) 操作来启动应用程序。

**启动应用程序**

1. 将以下 JSON 代码保存到名为 `start_request.json` 的文件中。

   ```
   {
       "ApplicationName": "getting_started",
       "RunConfiguration": {
           "ApplicationRestoreConfiguration": { 
            "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"
            }
       }
   }
   ```

1. 使用上述请求执行 `StartApplication` 操作来启动应用程序：

   ```
   aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
   ```

应用程序正在运行。您可以在亚马逊 CloudWatch 控制台上查看托管服务的 Apache Flink 指标，以验证应用程序是否正常运行。

## 停止应用程序
<a name="examples-s3sink-scala-stop"></a>

在本节中，您使用 [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html) 操作来停止应用程序。

**停止应用程序**

1. 将以下 JSON 代码保存到名为 `stop_request.json` 的文件中。

   ```
   {
      "ApplicationName": "s3_sink"
   }
   ```

1. 使用上述请求执行 `StopApplication` 操作来停止应用程序：

   ```
   aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
   ```

应用程序现已停止。

## 添加 CloudWatch 日志选项
<a name="examples-s3sink-scala-cw-option"></a>

您可以使用将 Amazon CloudWatch 日志流 AWS CLI 添加到您的应用程序中。有关在应用程序中使用 CloudWatch 日志的信息，请参阅[设置应用程序日志记录](https://docs.aws.amazon.com/managed-flink/latest/java/cloudwatch-logs.html)。

## 更新环境属性
<a name="examples-s3sink-scala-update-environment-properties"></a>

在本节中，您使用 [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 操作更改应用程序的环境属性，而无需重新编译应用程序代码。在该示例中，您更改源流和目标流的区域。

**更新应用程序的环境属性**

1. 将以下 JSON 代码保存到名为 `update_properties_request.json` 的文件中。

   ```
   {
         "ApplicationName": "getting_started",
          "CurrentApplicationVersionId": 1,
          "ApplicationConfigurationUpdate": { 
           "EnvironmentPropertyUpdates": { 
              "PropertyGroups": [ 
               { 
                  "PropertyGroupId": "ConsumerConfigProperties",
                  "PropertyMap" : {
                       "aws.region" : "us-west-2",
                       "stream.name" : "ExampleInputStream",
                       "flink.stream.initpos" : "LATEST"
                  }
               },
               { 
                  "PropertyGroupId": "ProducerConfigProperties",
                  "PropertyMap" : {
                       "aws.region" : "us-west-2",
                       "stream.name" : "ExampleOutputStream"
                  }
               }
              ]
           } 
       }
   ```

1. 使用前面的请求执行 `UpdateApplication` 操作以更新环境属性：

   ```
   aws kinesisanalyticsv2 update-application --cli-input-json file://update_properties_request.json
   ```

## 更新应用程序代码
<a name="examples-s3sink-scala-update-app-code"></a>

当您需要使用新版本的代码包更新应用程序代码时，可以使用 [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)CLI 操作。

**注意**  
要使用相同的文件名加载新版本的应用程序代码，您必须指定新的对象版本。有关使用 Amazon S3 对象版本的更多信息，请参阅[启用或禁用版本控制](https://docs.aws.amazon.com/AmazonS3/latest/user-guide/enable-versioning.html)。

要使用 AWS CLI，请从 Amazon S3 存储桶中删除之前的代码包，上传新版本，然后调用`UpdateApplication`，指定相同的 Amazon S3 存储桶和对象名称以及新的对象版本。应用程序将使用新的代码包重新启动。

以下示例 `UpdateApplication` 操作请求重新加载应用程序代码并重新启动应用程序。将 `CurrentApplicationVersionId` 更新为当前的应用程序版本。您可以使用 `ListApplications` 或 `DescribeApplication` 操作检查当前的应用程序版本。将存储桶名称后缀 (<用户名>) 更新为在[创建相关资源](examples-gs-scala.md#examples-gs-scala-resources)一节中选择的后缀。

```
{{
    "ApplicationName": "getting_started",
    "CurrentApplicationVersionId": 1,
    "ApplicationConfigurationUpdate": {
        "ApplicationCodeConfigurationUpdate": {
            "CodeContentUpdate": {
                "S3ContentLocationUpdate": {
                    "BucketARNUpdate": "arn:aws:s3:::ka-app-code-<username>",
                    "FileKeyUpdate": "getting-started-scala-1.0.jar",
                    "ObjectVersionUpdate": "SAMPLEUehYngP87ex1nzYIGYgfhypvDU"
                }
            }
        }
    }
}
```