

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# 開始方法 (Scala)
<a name="examples-gs-scala"></a>

**注記**  
バージョン 1.15 以降、Flink は Scala フリーになりました。アプリケーションが Scala の任意のバージョンから Java API を使用できるようになっています。Flink はまだ内部的にいくつかの主要コンポーネントで Scala を使用していますが、Scala をユーザーコードのクラスローダーに公開していません。そのため、Scala の依存関係を JAR アーカイブに追加する必要があります。  
Flink 1.15 での Scala の変更についての詳しい情報は、[Scala Free in One Fifteen](https://flink.apache.org/2022/02/22/scala-free.html)を参照してください。

この演習では、Kinesis ストリームをソースおよびシンクとして使用して、Scala 用の Managed Service for Apache Flink アプリケーションを作成します。

**Topics**
+ [依存リソースを作成する](#examples-gs-scala-resources)
+ [入力ストリームにサンプルレコードを書き込む](#examples-gs-scala-write)
+ [アプリケーションコードをダウンロードして調べる](#examples-gs-scala-download)
+ [アプリケーション・コードをコンパイルしてアップロードするには](#examples-gs-scala-upload)
+ [アプリケーションを作成して実行する (コンソール)](gs-scala-7.md)
+ [アプリケーションの作成と実行 (CLI)](examples-gs-scala-create-run-cli.md)
+ [AWS リソースをクリーンアップする](examples-gs-scala-cleanup.md)

## 依存リソースを作成する
<a name="examples-gs-scala-resources"></a>

この練習用の Managed Service for Apache Flink を作成する前に、以下の依存リソースを作成します。
+ 入力用と出力用の 2 つの Kinesis ストリーム。
+ アプリケーションのコードを保存するためのAmazon S3バケット (`ka-app-code-<username>`) 

Kinesis ストリームと Amazon S3 バケットは、コンソールを使用して作成できます。これらのリソースの作成手順については、次の各トピックを参照してください。
+ 「*Amazon Kinesis Data Streams デベロッパーガイド*」の「[Creating and Updating Data Streams](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)」。データストリーム**ExampleInputStream**と**ExampleOutputStream**に名前を付けます。

  データストリームを作成するには (AWS CLI)
  + 最初のストリーム (`ExampleInputStream`) を作成するには、次の Amazon Kinesis create-stream AWS CLI コマンドを使用します。

    ```
    aws kinesis create-stream \
        --stream-name ExampleInputStream \
        --shard-count 1 \
        --region us-west-2 \
        --profile adminuser
    ```
  + アプリケーションが出力の書き込みに使用する 2 つめのストリームを作成するには、ストリーム名を `ExampleOutputStream` に変更して同じコマンドを実行します。

    ```
    aws kinesis create-stream \
        --stream-name ExampleOutputStream \
        --shard-count 1 \
        --region us-west-2 \
        --profile adminuser
    ```
+ 「*Amazon Simple Storage Service ユーザーガイド*」の「[How Do I Create an S3 Bucket?](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html)」。ログイン名 (**ka-app-code-*<username>*** など) を追加して、Amazon S3 バケットにグローバルに一意の名前を付けます。

**その他のリソース**

アプリケーションを作成すると、Apache Flink 用 Managed Service によって次の Amazon CloudWatch リソースが作成されます（これらのリソースがまだ存在しない場合）。
+ `/AWS/KinesisAnalytics-java/MyApplication`という名前のロググループ。
+ `kinesis-analytics-log-stream` というログストリーム

## 入力ストリームにサンプルレコードを書き込む
<a name="examples-gs-scala-write"></a>

このセクションでは、Python スクリプトを使用して、アプリケーションが処理するサンプルレコードをストリームに書き込みます。

**注記**  
このセクションでは [AWS SDK for Python (Boto)](https://aws.amazon.com/developers/getting-started/python/) が必要です。

**注記**  
このセクションの Python スクリプトでは、 AWS CLIを使用しています。アカウント認証情報とデフォルトのリージョンを使用する AWS CLI ように を設定する必要があります。を設定するには AWS CLI、次のように入力します。  

```
aws configure
```

1. 次の内容で、`stock.py` という名前のファイルを作成します。

   ```
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           'event_time': datetime.datetime.now().isoformat(),
           'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
           'price': round(random.random() * 100, 2)}
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name,
               Data=json.dumps(data),
               PartitionKey="partitionkey")
   
   
   if __name__ == '__main__':
       generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
   ```

1. `stock.py` スクリプトを実行します。

   ```
   $ python stock.py
   ```

   チュートリアルの残りの部分を完了する間、スクリプトを実行し続けてください。

## アプリケーションコードをダウンロードして調べる
<a name="examples-gs-scala-download"></a>

この例の Python アプリケーションコードは GitHub から入手できます。アプリケーションコードをダウンロードするには、次の操作を行います。

1. Git クライアントをまだインストールしていない場合は、インストールします。詳細については、「[Git のインストール](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)」をご参照ください。

1. 次のコマンドを使用してリモートリポジトリのクローンを作成します。

   ```
   git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
   ```

1. `amazon-kinesis-data-analytics-java-examples/scala/GettingStarted` ディレクトリに移動します。

アプリケーションコードに関して、以下の点に注意してください。
+ `build.sbt`ファイルには、Managed Service for Apache Flink ライブラリなど、アプリケーションの設定と依存関係に関する情報が含まれています。
+ この `BasicStreamingJob.scala` ファイルには、アプリケーションの機能を定義するメインメソッドが含まれています。
+ アプリケーションは Kinesis ソースを使用して、ソースストリームから読み取りを行います。次のスニペットでは、Kinesis ソースが作成されます。

  ```
  private def createSource: FlinkKinesisConsumer[String] = {
    val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties
    val inputProperties = applicationProperties.get("ConsumerConfigProperties")
  
    new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName),
      new SimpleStringSchema, inputProperties)
  }
  ```

  また、アプリケーションは Kinesis シンクを使用して結果ストリームに書き込みます。次のスニペットでは、Kinesis シンクが作成されます。

  ```
  private def createSink: KinesisStreamsSink[String] = {
    val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties
    val outputProperties = applicationProperties.get("ProducerConfigProperties")
  
    KinesisStreamsSink.builder[String]
      .setKinesisClientProperties(outputProperties)
      .setSerializationSchema(new SimpleStringSchema)
      .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName))
      .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode))
      .build
  }
  ```
+ アプリケーションでは、ソースおよびシンクコネクタを作成し、 オブジェクトを使用して外部リソースにアクセスします。
+ アプリケーションは、動的アプリケーションプロパティを使用してソースコネクタとシンクコネクタを作成します。アプリケーションのプロパティを読み取ってコネクタを設定します。ランタイムプロパティの詳細については、[ランタイムプロパティ](https://docs.aws.amazon.com/managed-flink/latest/java/how-properties.html)を参照してください。

## アプリケーション・コードをコンパイルしてアップロードするには
<a name="examples-gs-scala-upload"></a>

このセクションでは、アプリケーションコードをコンパイルし、[依存リソースを作成する](#examples-gs-scala-resources) セクションで作成したAmazon S3バケットにアップロードします。

**アプリケーションコードのコンパイル**

このセクションでは、「[SBT](https://www.scala-sbt.org/)」 ビルド・ツールを使用してアプリケーションの Scala コードをビルドします。SBTをインストールするには、[Install sbt with cs setup](https://www.scala-sbt.org/download.html)を参照してください。また、Java 開発キット（JDK）をインストールする必要があります。[演習を完了するための前提条件](https://docs.aws.amazon.com/managed-flink/latest/java/getting-started.html#setting-up-prerequisites) を参照してください。

1. アプリケーションコードを使用するには、コードをコンパイルして JAR ファイルにパッケージ化します。SBT を使用してコードをコンパイルしてパッケージ化できます。

   ```
   sbt assembly
   ```

1. アプリケーションのコンパイルに成功すると、次のファイルが作成されます。

   ```
   target/scala-3.2.0/getting-started-scala-1.0.jar
   ```

**Apache Flink Streaming Scala Code のアップロード**

このセクションでは、Amazon S3 バケットを作成し、アプリケーションコードをアップロードします。

1. Amazon S3 コンソール ([https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)) を開きます。

1. [**バケットを作成**] を選択します。

1. [**Bucket name (バケット名)**] フィールドに`ka-app-code-<username>`と入力します。バケット名にユーザー名などのサフィックスを追加して、グローバルに一意にします。[**次へ**] を選択します。

1. **設定オプション**のステップでは、設定をそのままにし、[**次へ**] を選択します。

1. **アクセス許可の設定**のステップでは、設定をそのままにし、[**次へ**] を選択します。

1. **[バケットを作成]** を選択します。

1. `ka-app-code-<username>`バケットを選択し、**アップロード** を選択します。

1. **ファイルの選択**のステップで、[**ファイルを追加**] を選択します。前のステップで作成した `getting-started-scala-1.0.jar` ファイルに移動します。

1. オブジェクトの設定を変更する必要はないので、[**アップロード**] を選択してください。

アプリケーションコードが Amazon S3 バケットに保存され、アプリケーションからアクセスできるようになります。