

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 入门（Scala）
<a name="examples-gs-scala"></a>

**注意**  
从 Flink 1.15 版本开始，Scala 免费。应用程序现在可以使用任何 Scala 版本的 Java API。Flink 仍然在内部的一些关键组件中使用 Scala，但没有将 Scala 暴露到用户代码类加载器中。因此，您必须将 Scala 依赖项添加到自己的 JAR 存档中。  
有关 Flink 1.15 中 Scala 变更的更多信息，请参阅 [1.15 Scala 免费](https://flink.apache.org/2022/02/22/scala-free.html)。

在本练习中，您将创建面向 Scala 的 Managed Service for Apache Flink 应用程序，并将 Kinesis 流作为源和接收器。

**Topics**
+ [创建相关资源](#examples-gs-scala-resources)
+ [将示例记录写入输入流](#examples-gs-scala-write)
+ [下载并检查应用程序代码](#examples-gs-scala-download)
+ [编译并上传应用程序代码](#examples-gs-scala-upload)
+ [创建并运行应用程序（控制台）](gs-scala-7.md)
+ [创建并运行应用程序 (CLI)](examples-gs-scala-create-run-cli.md)
+ [清理 AWS 资源](examples-gs-scala-cleanup.md)

## 创建相关资源
<a name="examples-gs-scala-resources"></a>

在本练习中，创建Managed Service for Apache Flink的应用程序之前，您需要创建以下从属资源：
+ 两个 Kinesis 流用于输入和输出。
+ 存储应用程序代码 (`ka-app-code-<username>`) 的 Amazon S3 存储桶 

您可以使用控制台创建 Kinesis 流和 Amazon S3 存储桶。有关创建这些资源的说明，请参阅以下主题：
+ *Amazon Kinesis Data Streams 开发人员指南*中的[创建和更新数据流](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)。将数据流命名为 **ExampleInputStream** 和 **ExampleOutputStream**。

  创建数据流 (AWS CLI)
  + 要创建第一个直播 (`ExampleInputStream`)，请使用以下 Amazon Kinesis 创建 AWS CLI 流命令。

    ```
    aws kinesis create-stream \
        --stream-name ExampleInputStream \
        --shard-count 1 \
        --region us-west-2 \
        --profile adminuser
    ```
  + 要创建应用程序用来写入输出的第二个流，请运行同一命令（将流名称更改为 `ExampleOutputStream`）。

    ```
    aws kinesis create-stream \
        --stream-name ExampleOutputStream \
        --shard-count 1 \
        --region us-west-2 \
        --profile adminuser
    ```
+ *《Amazon Simple Storage Service 用户指南》*中的[如何创建 S3 存储桶？](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html)。附加您的登录名，以便为 Amazon S3 存储桶指定全局唯一的名称，例如 **ka-app-code-*<username>***。

**其他资源**

在您创建应用程序时，适用于 Apache Flink 的托管服务会创建以下 Amazon CloudWatch 资源（如果这些资源尚不存在）：
+ 名为 `/AWS/KinesisAnalytics-java/MyApplication` 的日志组
+ 名为 `kinesis-analytics-log-stream` 的日志流

## 将示例记录写入输入流
<a name="examples-gs-scala-write"></a>

在本节中，您使用 Python 脚本将示例记录写入流，以供应用程序处理。

**注意**  
此部分需要 [AWS SDK for Python (Boto)](https://aws.amazon.com/developers/getting-started/python/)。

**注意**  
本节中的 Python 脚本使用 AWS CLI。您必须将您的配置 AWS CLI 为使用您的账户凭证和默认区域。要配置您的 AWS CLI，请输入以下内容：  

```
aws configure
```

1. 使用以下内容创建名为 `stock.py` 的文件：

   ```
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           'event_time': datetime.datetime.now().isoformat(),
           'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
           'price': round(random.random() * 100, 2)}
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name,
               Data=json.dumps(data),
               PartitionKey="partitionkey")
   
   
   if __name__ == '__main__':
       generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
   ```

1. 运行 `stock.py` 脚本：

   ```
   $ python stock.py
   ```

   在完成本教程的其余部分时，请将脚本保持运行状态。

## 下载并检查应用程序代码
<a name="examples-gs-scala-download"></a>

此示例的 Python 应用程序代码可从中获得 GitHub。要下载应用程序代码，请执行以下操作：

1. 如果尚未安装 Git 客户端，请安装它。有关更多信息，请参阅[安装 Git](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)。

1. 使用以下命令克隆远程存储库：

   ```
   git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
   ```

1. 导航到 `amazon-kinesis-data-analytics-java-examples/scala/GettingStarted` 目录。

请注意有关应用程序代码的以下信息：
+ `build.sbt` 文件包含有关应用程序的配置和从属项的信息，包括Managed Service for Apache Flink的库。
+ `BasicStreamingJob.scala` 文件包含定义应用程序功能的主要方法。
+ 应用程序使用 Kinesis 源从源流中进行读取。以下代码段创建 Kinesis 源：

  ```
  private def createSource: FlinkKinesisConsumer[String] = {
    val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties
    val inputProperties = applicationProperties.get("ConsumerConfigProperties")
  
    new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName),
      new SimpleStringSchema, inputProperties)
  }
  ```

  该应用程序还使用 Kinesis 接收器写入结果流。以下代码段创建 Kinesis 接收器：

  ```
  private def createSink: KinesisStreamsSink[String] = {
    val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties
    val outputProperties = applicationProperties.get("ProducerConfigProperties")
  
    KinesisStreamsSink.builder[String]
      .setKinesisClientProperties(outputProperties)
      .setSerializationSchema(new SimpleStringSchema)
      .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName))
      .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode))
      .build
  }
  ```
+ 应用程序创建源连接器和接收器连接器，以使用 StreamExecutionEnvironment 对象访问外部资源。
+ 该应用程序将使用动态应用程序属性创建源和接收连接器。读取应用程序的运行时系统属性来配置连接器。有关运行时系统属性的更多信息，请参阅[运行时系统属性](https://docs.aws.amazon.com/managed-flink/latest/java/how-properties.html)。

## 编译并上传应用程序代码
<a name="examples-gs-scala-upload"></a>

在本节中，您将编译应用程序代码并将其上传到您在 [创建相关资源](#examples-gs-scala-resources) 节中创建的 Amazon S3 存储桶。

**编译应用程序代码**

在本节中，您使用 [SBT](https://www.scala-sbt.org/) 构建工具为应用程序构建 Scala 代码。要安装 SBT，请参阅[使用 cs 安装程序安装 sbt](https://www.scala-sbt.org/download.html)。您还需要安装 Java 开发工具包 (JDK)。参阅[完成练习的先决条件](https://docs.aws.amazon.com/managed-flink/latest/java/getting-started.html#setting-up-prerequisites)。

1. 要使用您的应用程序代码，您将其编译和打包成 JAR 文件。您可以用 SBT 编译和打包代码：

   ```
   sbt assembly
   ```

1. 如果应用程序成功编译，则创建以下文件：

   ```
   target/scala-3.2.0/getting-started-scala-1.0.jar
   ```

**上传 Apache Flink 流式处理 Scala 代码**

在本节中，创建 Amazon S3 存储桶并上传应用程序代码。

1. 打开 Amazon S3 控制台，网址为 [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)。

1. 选择**创建存储桶**。

1. 在 **存储桶名称** 字段中输入 `ka-app-code-<username>`。将后缀（如您的用户名）添加到存储桶名称，以使其具有全局唯一性。选择**下一步**。

1. 在**配置选项**中，让设置保持原样，然后选择**下一步**。

1. 在**设置权限**中，让设置保持原样，然后选择**下一步**。

1. 选择 **创建存储桶 **。

1. 选择 `ka-app-code-<username>` 存储桶，然后选择**上传**。

1. 在**选择文件**步骤中，选择**添加文件**。导航到您在上一步中创建的 `getting-started-scala-1.0.jar` 文件。

1. 您无需更改该对象的任何设置，因此，请选择**上传**。

您的应用程序代码现在存储在 Amazon S3 存储桶中，应用程序可以在其中访问代码。