

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 開始使用 (Scala)
<a name="examples-gs-scala"></a>

**注意**  
從 1.15 版開始，Flink 不含 Scala。應用程式現在可以使用任何 Scala 版本的 Java API。Flink 仍會在內部幾個關鍵元件中使用 Scala，但不會向使用者程式碼類別載入器公開 Scala。因此，您必須將 Scala 相依性新增至 JAR 封存檔。  
如需 Flink 1.15 中的 Scala 變更之詳細資訊，請參閱[在 1.15 版中移除了 Scala 相依性](https://flink.apache.org/2022/02/22/scala-free.html)。

在本練習中，您會使用 Kinesis 串流做為來源和接收器，為 Scala 建立 Managed Service for Apache Flink 應用程式。

**Topics**
+ [建立相依資源](#examples-gs-scala-resources)
+ [將範例記錄寫入輸入串流](#examples-gs-scala-write)
+ [下載並檢查應用程式程式碼](#examples-gs-scala-download)
+ [編譯和上傳應用程式的程式碼](#examples-gs-scala-upload)
+ [建立並執行應用程式 （主控台）](gs-scala-7.md)
+ [建立並執行應用程式 (CLI)](examples-gs-scala-create-run-cli.md)
+ [清除 AWS 資源](examples-gs-scala-cleanup.md)

## 建立相依資源
<a name="examples-gs-scala-resources"></a>

在為本練習建立 Managed Service for Apache Flink 應用程式之前，先建立下列相依資源：
+ 兩個 Kinesis 串流，用於輸入和輸出。
+ Amazon S3 儲存貯體，用來儲存應用程式的程式碼 (`ka-app-code-<username>`) 

您可以在主控台中建立 Kinesis 串流和 Amazon S3 儲存貯體。如需建立這些資源的相關指示，請參閱以下主題：
+ 《Amazon Kinesis Data Streams 開發人員指南》中的[建立和更新資料串流](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)**。為資料串流 **ExampleInputStream** 和 **ExampleOutputStream** 命名。

  建立資料串流 (AWS CLI)
  + 若要建立第一個串流 (`ExampleInputStream`)，請使用下列 Amazon Kinesis create-stream AWS CLI 命令。

    ```
    aws kinesis create-stream \
        --stream-name ExampleInputStream \
        --shard-count 1 \
        --region us-west-2 \
        --profile adminuser
    ```
  + 若要建立應用程式用來寫入輸出的第二個串流，請執行相同的命令，將串流名稱變更為 `ExampleOutputStream`。

    ```
    aws kinesis create-stream \
        --stream-name ExampleOutputStream \
        --shard-count 1 \
        --region us-west-2 \
        --profile adminuser
    ```
+ 《Amazon Simple Storage Service 使用者指南》中的[如何建立 S3 儲存貯體](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html)。**透過附加登入名稱 (例如 **ka-app-code-*<username>***)，為 Amazon S3 儲存貯體提供全域唯一的名稱。

**其他資源**

建立應用程式時，Managed Service for Apache Flink 會建立下列 Amazon CloudWatch 資源 (如果尚不存在該資源)：
+ 名為 `/AWS/KinesisAnalytics-java/MyApplication` 的日誌群組。
+ 名為 `kinesis-analytics-log-stream` 的日誌串流。

## 將範例記錄寫入輸入串流
<a name="examples-gs-scala-write"></a>

在本節，您會使用 Python 指令碼將範例記錄寫入供應用程式處理的串流。

**注意**  
本節需要 [適用於 Python (Boto) 的 AWS SDK](https://aws.amazon.com/developers/getting-started/python/)。

**注意**  
本節中的 Python 指令碼會使用 AWS CLI。您必須 AWS CLI 將 設定為使用您的帳戶登入資料和預設區域。若要設定您的 AWS CLI，請輸入下列項目：  

```
aws configure
```

1. 使用下列內容建立名為 `stock.py` 的檔案：

   ```
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           'event_time': datetime.datetime.now().isoformat(),
           'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
           'price': round(random.random() * 100, 2)}
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name,
               Data=json.dumps(data),
               PartitionKey="partitionkey")
   
   
   if __name__ == '__main__':
       generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
   ```

1. 執行 `stock.py` 指令碼：

   ```
   $ python stock.py
   ```

   在完成教學課程的其餘部分時，讓指令碼保持執行狀態。

## 下載並檢查應用程式程式碼
<a name="examples-gs-scala-download"></a>

此範例的 Python 應用程式的程式碼可從 GitHub 下載。若要下載應用程式的程式碼，請執行下列動作：

1. 如果您尚未安裝 Git 用戶端，請先安裝。如需詳細資訊，請參閱[安裝 Git](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)。

1. 使用以下指令複製遠端儲存庫：

   ```
   git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
   ```

1. 導覽至 `amazon-kinesis-data-analytics-java-examples/scala/GettingStarted` 目錄。

請留意下列與應用程式的程式碼相關的資訊：
+ `build.sbt` 檔案包含應用程式的組態和相依性資訊，包括 Managed Service for Apache Flink 程式庫。
+ `BasicStreamingJob.scala` 檔案包含定義應用程式功能的主要方法。
+ 應用程式使用 Kinesis 來源從來源串流讀取。以下程式碼片段會建立 Kinesis 來源：

  ```
  private def createSource: FlinkKinesisConsumer[String] = {
    val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties
    val inputProperties = applicationProperties.get("ConsumerConfigProperties")
  
    new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName),
      new SimpleStringSchema, inputProperties)
  }
  ```

  應用程式也會使用 Kinesis 接收器寫入結果串流。以下程式碼片段會建立 Kinesis 目的地：

  ```
  private def createSink: KinesisStreamsSink[String] = {
    val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties
    val outputProperties = applicationProperties.get("ProducerConfigProperties")
  
    KinesisStreamsSink.builder[String]
      .setKinesisClientProperties(outputProperties)
      .setSerializationSchema(new SimpleStringSchema)
      .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName))
      .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode))
      .build
  }
  ```
+ 應用程式會建立來源與目的地連接器，以使用 StreamExecutionEnvironment 物件來存取外部資源。
+ 應用程式會使用動態應用程式屬性來建立來源與目的地連接器。會讀取執行期應用程式的屬性，來設定連接器。如需執行期屬性的詳細資訊，請參閱[執行期屬性](https://docs.aws.amazon.com/managed-flink/latest/java/how-properties.html)。

## 編譯和上傳應用程式的程式碼
<a name="examples-gs-scala-upload"></a>

在本節中，您會編譯應用程式的程式碼，並將其上傳至在[建立相依資源](#examples-gs-scala-resources)一節建立的 Amazon S3 儲存貯體。

**編譯應用程式的程式碼**

在本節中，您將使用 [SBT](https://www.scala-sbt.org/) 建置工具來建置應用程式的 Scala 程式碼。若要安裝 SBT，請參閱[使用 cs 安裝程式安裝 sbt](https://www.scala-sbt.org/download.html)。您還需要安裝 Java 開發套件 (JDK)。請參閱[完成練習的先決條件](https://docs.aws.amazon.com/managed-flink/latest/java/getting-started.html#setting-up-prerequisites)。

1. 請將應用程式的程式碼編譯並封裝成 JAR 檔案，以使用應用程式的程式碼。您可以使用 SBT 編譯和封裝程式碼：

   ```
   sbt assembly
   ```

1. 如果應用程式成功編譯，則會建立下列檔案：

   ```
   target/scala-3.2.0/getting-started-scala-1.0.jar
   ```

**上傳 Apache Flink 串流 Scala 程式碼**

在本節中，您會建立 Amazon S3 儲存貯體並上傳您應用程式的程式碼。

1. 開啟位於 [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/) 的 Amazon S3 主控台。

1. 選擇**建立儲存貯體**。

1. 在**儲存貯體名稱**欄位中，輸入 `ka-app-code-<username>`。新增尾碼至儲存貯體名稱，例如您的使用者名稱，使其成為全域唯一的。選擇**下一步**。

1. 在**設定選項**中，保留原有設定並選擇**下一步**。

1. 在**設定許可**步驟中，保留原有設定並選擇**下一步**。

1. 選擇**建立儲存貯體**。

1. 選擇 `ka-app-code-<username>` 儲存貯體，然後選擇**上傳**。

1. 在**選取檔案**步驟中，選擇 **新增檔案**。導覽至您在上一步驟中建立的 `getting-started-scala-1.0.jar` 檔案。

1. 您不需要變更物件的任何設定，因此請選擇**上傳**。

您的應用程式的程式碼現在儲存在您的應用程式可以存取的 Amazon S3 儲存貯體中。