

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Apache Flink アプリケーション用 Managed Serviceを作成して実行する
<a name="get-started-exercise"></a>

このステップでは、Kinesis Data Streams をソースおよびシンクとして使用して、Managed Service for Apache Flink アプリケーションを作成します。

**Topics**
+ [依存リソースを作成する](#get-started-exercise-0)
+ [ローカルの開発環境のセットアップ](#get-started-exercise-2)
+ [Apache Flink Streaming Java Code のダウンロードと検証](#get-started-exercise-5)
+ [入力ストリームにサンプルレコードを書き込む](#get-started-exercise-5-4)
+ [アプリケーションをローカルで実行する](#get-started-exercise-5-run)
+ [Kinesis ストリームで入出力データを観察する](#get-started-exercise-input-output)
+ [ローカルで実行されているアプリケーションを停止する](#get-started-exercise-stop)
+ [アプリケーションコードをコンパイルしてパッケージ化する](#get-started-exercise-5-5)
+ [アプリケーションコードの JAR ファイルをアップロードする](#get-started-exercise-6)
+ [Managed Service for Apache Flink アプリケーションを作成して設定する](#get-started-exercise-7)
+ [次のステップ](#get-started-exercise-next-step-4)

## 依存リソースを作成する
<a name="get-started-exercise-0"></a>

この練習用の Managed Service for Apache Flink を作成する前に、以下の依存リソースを作成します。
+ 入力用と出力用に 2 つの Kinesis Data Streams。
+ アプリケーションのコードを保存する Amazon S3 バケット
**注記**  
このチュートリアルでは、アプリケーションを us-east-1 米国東部 (バージニア北部) リージョンにデプロイすることが前提とされます。別のリージョンを使用する場合、それに応じてすべてのステップを調整します。

### 2 つの Amazon Kinesis Data Streams を作成する
<a name="get-started-exercise-1"></a>

この演習で Apache Flink アプリケーションのマネージドサービスを作成する前に、2 つの Kinesis データストリーム (`ExampleInputStream` と `ExampleOutputStream`) を作成する必要があります。アプリケーションでは、これらのストリームを使用してアプリケーションの送信元と送信先のストリームを選択します。

これらのストリームは、Amazon Kinesis コンソールまたは次の AWS CLI コマンドを使用して作成できます。コンソールの操作方法については、「*Amazon Kinesis Data Streams デベロッパーガイド*」の「[Creating and Updating Data Streams](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)」を参照してください。を使用してストリームを作成するには AWS CLI、次のコマンドを使用して、アプリケーションに使用するリージョンを調整します。

**データストリームを作成するには (AWS CLI)**

1. 最初のストリーム (`ExampleInputStream`) を作成するには、次の Amazon Kinesis `create-stream` AWS CLI コマンドを使用します。

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-east-1 \
   ```

1. アプリケーションが出力の書き込みに使用する 2 つ目のストリームを作成するには、ストリーム名を `ExampleOutputStream` に変更して同じコマンドを実行します。

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-east-1 \
   ```

### アプリケーションコードの Amazon S3 バケットを作成する
<a name="get-started-exercise-1-5"></a>

Amazon S3 バケットは、コンソールを使用して作成できます。コンソールを使用して Amazon S3 バケットの作成の詳細については、「[Amazon S3 ユーザーガイド](https://docs.aws.amazon.com/AmazonS3/latest/userguide/)」の「[Creating a bucket](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html)」を参照してください。ログイン名を追加するなど、グローバルに一意の名前を Amazon S3 バケットに付けます。

**注記**  
 このチュートリアルで使用するリージョン (us-east-1) でバケットを必ず作成してください。

### その他のリソース
<a name="get-started-exercise-1-6"></a>

アプリケーションを作成すると、Managed Service for Apache Flink によって次の Amazon CloudWatch リソースが自動的に作成されます (これらのリソースがまだ存在しない場合)。
+ `/AWS/KinesisAnalytics-java/<my-application>`という名前のロググループ。
+ `kinesis-analytics-log-stream` というログストリーム

## ローカルの開発環境のセットアップ
<a name="get-started-exercise-2"></a>

開発およびデバッグの場合、選択した IDE から直接マシンで Apache Flink アプリケーションを実行できます。Apache Flink の依存関係は、Apache Maven を使用して通常の Java の依存関係のように処理されます。

**注記**  
開発マシンには、Java JDK 11、Maven、Git がインストールされている必要があります。[Eclipse Java Neon](https://www.eclipse.org/downloads/packages/release/neon/3) や [IntelliJ IDEA](https://www.jetbrains.com/idea/) などの開発環境を使用することをお勧めします。すべての前提条件を満たしていることを確認するには、「[演習を完了するための前提条件を満たす](getting-started.md#setting-up-prerequisites)」を参照してください。マシンに Apache Flink クラスターをインストールする必要は**ありません**。

### AWS セッションを認証する
<a name="get-started-exercise-2-5"></a>

アプリケーションは Kinesis Data Streams を使用してデータを発行します。ローカルで実行する場合は、Kinesis データストリームに書き込むアクセス許可を持つ有効な AWS 認証済みセッションが必要です。次のステップに従って、セッションを認証します。

1.  AWS CLI と、有効な認証情報が設定された名前付きプロファイルがない場合は、「」を参照してください[AWS Command Line Interface (AWS CLI) のセットアップ](setup-awscli.md)。

1.  AWS CLI が正しく設定されており、ユーザーが次のテストレコードを発行して Kinesis データストリームに書き込むアクセス許可を持っていることを確認します。

   ```
   $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
   ```

1. IDE に統合するプラグインがある場合は AWS、それを使用して IDE で実行されているアプリケーションに認証情報を渡すことができます。詳細については、「[AWS Toolkit for IntelliJ IDEA](https://aws.amazon.com/intellij/)」および「[AWS Toolkit for Eclipse](https://docs.aws.amazon.com/toolkit-for-eclipse/v1/user-guide/welcome.html)」を参照してください。

## Apache Flink Streaming Java Code のダウンロードと検証
<a name="get-started-exercise-5"></a>

この例の Java アプリケーションコードは GitHub から入手できます。アプリケーションコードをダウンロードするには、次の操作を行います。

1. 次のコマンドを使用してリモートリポジトリのクローンを作成します。

   ```
   git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
   ```

1. `amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted` ディレクトリに移動します。

### アプリケーションコンポーネントを確認する
<a name="get-started-exercise-5-1"></a>

アプリケーションは `com.amazonaws.services.msf.BasicStreamingJob` クラスで完全に実装されています。`main()` メソッドは、ストリーミングデータを処理して実行するデータフローを定義します。

**注記**  
最適化された開発者エクスペリエンスを IDE で開発するため、アプリケーションは Amazon Managed Service for Apache Flink とローカルの両方でコードを変更せずに実行されるように設計されています。
+ Amazon Managed Service for Apache Flink および IDE で実行するときに動作するようにランタイム設定を読み取るため、アプリケーションは IDE でローカルにスタンドアロンとして実行されているかどうか自動的に検出します。この場合、アプリケーションはランタイム設定を異なる方法で読み込みます。

  1. アプリケーションが IDE でスタンドアロンモードで実行されていることが検出されたら、プロジェクトの **[リソース]** フォルダに含まれている `application_properties.json` ファイルを作成します。ファイルの内容は次のようになります。

  1. アプリケーションが Amazon Managed Service for Apache Flink で実行されると、デフォルトの動作により、Amazon Managed Service for Apache Flink アプリケーションで定義するランタイムプロパティからアプリケーション設定が読み込まれます。「[Managed Service for Apache Flink アプリケーションを作成して設定する](#get-started-exercise-7)」を参照してください。

     ```
     private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
         if (env instanceof LocalStreamEnvironment) {
             LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE);
             return KinesisAnalyticsRuntime.getApplicationProperties(
                     BasicStreamingJob.class.getClassLoader()
                             .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath());
         } else {
             LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink");
             return KinesisAnalyticsRuntime.getApplicationProperties();
         }
     }
     ```
+ `main()` メソッドにより、アプリケーションのデータフローが定義されて実行されます。
  + デフォルトのストリーミング環境を初期化します。この例では、DataSteam API および `StreamTableEnvironment` で使用される両方の `StreamExecutionEnvironment` を作成し、SQL および Table API と使用する方法が示されます。2 つの環境オブジェクトは、異なる API を使用するための、同じランタイム環境への 2 つの別々のリファレンスです。

    ```
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    ```
  + アプリケーション設定パラメータを読み込みます。アプリケーションが実行されている場所に応じて、正しい場所から自動的にロードします。

    ```
    Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    ```
  + アプリケーションは [Kinesis Consumer](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/#kinesis-consumer) コネクタを使用してソースを定義して、入力ストリームからデータを読み取ります。入力ストリームの設定は、`PropertyGroupId`=`InputStream0` で定義されています。ストリームの名前およびリージョンは、それぞれ `stream.name` および `aws.region` という名前のプロパティにあります。簡素化するため、このソースは文字列としてレコードを読み取ります。

    ```
    private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) {
        String inputStreamName = inputProperties.getProperty("stream.name");
        return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties);
    }
    ...
    
    public static void main(String[] args) throws Exception { 
       ...
       SourceFunction<String> source = createSource(applicationParameters.get("InputStream0"));
       DataStream<String> input = env.addSource(source, "Kinesis Source");  
       ...
    }
    ```
  + 次に、アプリケーションは [Kinesis Streams Sink](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/#kinesis-streams-sink) コネクタを使用してシンクを定義し、出力ストリームにデータを送信します。出力ストリーム名およびリージョンは入力ストリームと同様に `PropertyGroupId`=`OutputStream0` で定義されます。シンクは、ソースからデータを取得している内部 `DataStream` に直接接続されています。実際のアプリケーションでは、ソースとシンクの間に何らかの変換があります。

    ```
    private static KinesisStreamsSink<String> createSink(Properties outputProperties) {
        String outputStreamName = outputProperties.getProperty("stream.name");
        return KinesisStreamsSink.<String>builder()
                .setKinesisClientProperties(outputProperties)
                .setSerializationSchema(new SimpleStringSchema())
                .setStreamName(outputStreamName)
                .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))
                .build();
    }
    ...
    public static void main(String[] args) throws Exception { 
       ...
       Sink<String> sink = createSink(applicationParameters.get("OutputStream0"));
       input.sinkTo(sink);
       ...
    }
    ```
  + 最後に、先ほど定義したデータフローを実行します。これは、データフローに必要なすべてのオペレータを定義した後の、`main()` メソッドの最後の命令である必要があります。

    ```
    env.execute("Flink streaming Java API skeleton");
    ```

### pom.xml ファイルを使用する
<a name="get-started-exercise-5-2"></a>

pom.xml ファイルによってアプリケーションに必要なすべての依存関係が定義され、Flink に必要なすべての依存関係を含む fat-jar を構築するため、Maven Shade プラグインが設定されます。
+ 一部の依存関係には `provided` スコープがあります。これらの依存関係は、アプリケーションが Amazon Managed Service for Apache Flink で実行されると自動的に利用可能となります。アプリケーションをコンパイルしたり、IDE でアプリケーションをローカルで実行したりするために必要です。詳細については、「[アプリケーションをローカルで実行する](#get-started-exercise-5-run)」を参照してください。Amazon Managed Service for Apache Flink で使用するランタイムと同じ Flink バージョンを使用していることを確認してください。

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  ```
+ このアプリケーションで使用される [Kinesis コネクタ](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kinesis/)など、デフォルトのスコープを持つ pom に Apache Flink 依存関係を追加する必要があります。詳細については、「[Apache Flink コネクタを使用する](how-flink-connectors.md)」を参照してください。アプリケーションに必要なその他の Java 依存関係を追加することもできます。

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kinesis</artifactId>
      <version>${aws.connector.version}</version>
  </dependency>
  ```
+ Maven Java Compiler プラグインにより、Java 11 (Apache Flink で現在サポートされている JDK バージョン) に対してコードがコンパイルされていることが確認されます。
+ ランタイムによって提供される一部のライブラリを除き、Maven Shade プラグインによって fat-jar がパッケージ化されます。`ServicesResourceTransformer` および `ManifestResourceTransformer` という 2 つのトランスフォーマーも指定されます。後者は `main` メソッドを含むクラスを設定して、アプリケーションを起動します。メインクラスの名前を変更する場合、このトランスフォーマーを必ず更新してください。
+ 

  ```
  <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      ...
          <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
              <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass>
          </transformer>
      ...
  </plugin>
  ```

## 入力ストリームにサンプルレコードを書き込む
<a name="get-started-exercise-5-4"></a>

このセクションでは、アプリケーションが処理するサンプルレコードをストリームに送信します。サンプルデータを生成するオプションは 2 つあり、Python スクリプトまたは [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator) のいずれかを使用します。

### Python スクリプトを使用してサンプルデータを生成する
<a name="get-started-exercise-5-4-1"></a>

Python スクリプトを使用して、サンプルレコードをストリームに送信できます。

**注記**  
この Python スクリプトを実行するには、Python 3.x を使用して [AWS SDK for Python (Boto)](https://aws.amazon.com/developer/language/python/) ライブラリがインストールされている必要があります。

**Kinesis 入力ストリームにテストデータの送信を開始する方法**

1. [Data Generator GitHub リポジトリ](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/data-generator)から Data Generator `stock.py` Python スクリプトをダウンロードします。

1. `stock.py` スクリプトを実行します。

   ```
   $ python stock.py
   ```

チュートリアルの残りの部分を実践する間、スクリプトを実行し続けてください。Apache Flink アプリケーションを実行できるようになりました。

### Kinesis Data Generator を使用してサンプルデータを生成する
<a name="get-started-exercise-5-4-2"></a>

Python スクリプトを使用する代替手段として、[ホストバージョン](https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html)でも利用可能な [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator) を使用し、ランダムなサンプルデータをストリームに送信できます。Kinesis Data Generator はブラウザで実行されるため、マシンに何もインストールする必要はありません。

**Kinesis Data Generator を設定して実行する方法**

1. 「[Kinesis Data Generator ドキュメント](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)」の指示に従って、ツールへのアクセスを設定します。ユーザーとパスワードを設定する CloudFormation テンプレートを実行します。

1. CloudFormation テンプレートによって生成された URL を介して Kinesis Data Generator にアクセスします。CloudFormation テンプレートが完了したら、**[出力]** タブに URL が表示されます。

1. Data Generator を設定します。
   + **リージョン:** このチュートリアルで使用しているリージョン (us-east-1) を選択します。
   + **ストリーム/配信ストリーム:** アプリケーションが使用する入力ストリーム (`ExampleInputStream`) を選択します。
   + **1 秒あたりのレコード数:** 100
   + **レコードテンプレート:** 次のテンプレートをコピーして貼り付けます。

     ```
     {
       "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}},
       "ticker" : "{{random.arrayElement(
             ["AAPL", "AMZN", "MSFT", "INTC", "TBV"]
         )}}",
       "price" : {{random.number(100)}}          
     }
     ```

1. テンプレートをテストする: **[テンプレートのテスト]** を選択し、生成されたレコードが次の内容と同じであることを確認してください。

   ```
   { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
   ```

1. Data Generator を起動する: **[データ送信の選択]** を選択します。

現在、Kinesis Data Generator は `ExampleInputStream` にデータを送信しています。

## アプリケーションをローカルで実行する
<a name="get-started-exercise-5-run"></a>

Flink アプリケーションを IDE でローカルで実行およびデバッグできます。

**注記**  
続行する前に、入力ストリームと出力ストリームが利用できることを確認してください。「[2 つの Amazon Kinesis Data Streams を作成する](#get-started-exercise-1)」を参照してください。また、両方のストリームから読み書きするアクセス許可があることを確認してください。「[AWS セッションを認証する](#get-started-exercise-2-5)」を参照してください。  
ローカル開発環境をセットアップするには、Java 11 JDK、Apache Maven、Java 開発用の IDE が必要です。必要な前提条件を満たしていることを確認してください。「[演習を完了するための前提条件を満たす](getting-started.md#setting-up-prerequisites)」を参照してください。

### Java プロジェクトを IDE にインポートする
<a name="get-started-exercise-5-run-1"></a>

IDE でアプリケーションの使用を開始するには、Java プロジェクトとしてインポートする必要があります。

クローンしたリポジトリには、複数の例が含まれています。各例は個別のプロジェクトです。このチュートリアルでは、`./java/GettingStarted` サブディレクトリのコンテンツを IDE にインポートします。

Maven を使用して、コードを既存の Java プロジェクトとして挿入します。

**注記**  
新しい Java プロジェクトをインポートする正確なプロセスは、使用している IDE によって異なります。

### ローカルアプリケーション設定を確認する
<a name="get-started-exercise-5-run-2"></a>

ローカルで実行すると、アプリケーションでは `./src/main/resources` のプロジェクトのリソースフォルダにある `application_properties.json` ファイルの設定が使用されます。このファイルを編集して、異なる Kinesis ストリーム名またはリージョンを使用できます。

```
[
  {
    "PropertyGroupId": "InputStream0",
    "PropertyMap": {
      "stream.name": "ExampleInputStream",
      "flink.stream.initpos": "LATEST",
      "aws.region": "us-east-1"
    }
  },
  {
    "PropertyGroupId": "OutputStream0",
    "PropertyMap": {
      "stream.name": "ExampleOutputStream",
      "aws.region": "us-east-1"
    }
  }
]
```

### IDE の実行設定を設定する
<a name="get-started-exercise-5-run-3"></a>

任意の Java アプリケーションを実行する場合と同様に、メインクラス `com.amazonaws.services.msf.BasicStreamingJob` を実行することで、IDE から直接 Flink アプリケーションを実行およびデバッグできます。アプリケーションを実行する前に、実行設定を設定する必要があります。セットアップは使用している IDE によって異なります。例えば、「IntelliJ IDEA ドキュメント」の「[Run/debug configurations](https://www.jetbrains.com/help/idea/run-debug-configuration.html)」を参照してください。特に、次の内容を設定する必要があります。

1. **クラスパスに `provided` 依存関係を追加します**。ローカルで実行するとき、`provided` スコープを持つ依存関係がアプリケーションに渡されることを確認するために必要です。この設定を行わないと、アプリケーションに `class not found` エラーが直ちに表示されます。

1. **Kinesis ストリームにアクセスするための AWS 認証情報をアプリケーションに渡します**。最も手軽な方法は、[AWS Toolkit for IntelliJ IDEA](https://aws.amazon.com/intellij/) を使用することです。実行設定でこの IDE プラグインを使用すると、特定の AWS プロファイルを選択できます。 AWS 認証は、このプロファイルを使用して行われます。 AWS 認証情報を直接渡す必要はありません。

1. IDE が **JDK 11** を使用してアプリケーションを実行することを確認してください。

### IDE でアプリケーションを実行する
<a name="get-started-exercise-5-run-4"></a>

`BasicStreamingJob` の実行設定を設定したら、通常の Java アプリケーションのように実行またはデバッグできます。

**注記**  
コマンドラインから `java -jar ...` を使用して、Maven によって生成される fat-jar を直接実行することはできません。この jar には、アプリケーションをスタンドアロンで実行するために必要な Flink コア依存関係は含まれていません。

アプリケーションが正常に起動すると、スタンドアロンのミニクラスターおよびコネクタの初期化に関する情報の一部がログ記録されます。この後、アプリケーションの起動時に Flink が通常出力する多数の INFO およびいくつかの WARN ログが続きます。

```
13:43:31,405 INFO  com.amazonaws.services.msf.BasicStreamingJob                 [] - Loading application properties from 'flink-application-properties-dev.json'
13:43:31,549 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb.
13:43:31,677 INFO  org.apache.flink.runtime.minicluster.MiniCluster             [] - Starting Flink Mini Cluster
....
```

初期化が完了したら、アプリケーションはそれ以上のログエントリを出力しません。**データが流れている間、ログは出力されません。**

アプリケーションがデータを正しく処理しているかどうか確認するには、次のセクションで記述されているとおり、入出力の Kinesis ストリームを確認することができます。

**注記**  
 フローデータに関するログが出力されないことは、Flink アプリケーションの通常の動作です。すべてのレコードに関するログが出力されるとデバッグに便利ですが、実稼働での実行時にかなりのオーバーヘッドが発生する可能性があります。

## Kinesis ストリームで入出力データを観察する
<a name="get-started-exercise-input-output"></a>

Amazon Kinesis コンソールで **[データビューワー]** を使用することで、(生成サンプル Python) または Amazon Kinesis Data Generator (リンク) によって入力ストリームに送信されるレコードを観察できます。

**レコードを観察する方法**

1. Kinesis コンソール ([https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)) を開きます。

1. リージョンが、このチュートリアルを実行しているリージョンと同じであることを確認してください。デフォルトでは us-east-1 米国東部 (バージニア北部) です。リージョンが一致しない場合は変更します。

1. **[データストリーム]** を選択します。

1. `ExampleInputStream` または `ExampleOutputStream.` のいずれか、観察するストリームを選択します。

1. **[データビューワー]** タブを選択します。

1. 任意の **[シャード]** を選択し、**[最新]** を **[開始位置]** のままにして **[レコードを取得]** を選択します。「このリクエストのレコードが見つかりません」というエラーが表示される場合があります。この場合、**[レコードの取得を再試行]** を選択します。ストリームディスプレイに発行された最新レコード。

1. データ列の値を選択すると、レコードの内容を JSON 形式で確認できます。

## ローカルで実行されているアプリケーションを停止する
<a name="get-started-exercise-stop"></a>

IDE で実行されているアプリケーションを停止します。　 通常、IDE には「停止」オプションがあります。正確な場所および方法は、お使いの IDE によって異なります。

## アプリケーションコードをコンパイルしてパッケージ化する
<a name="get-started-exercise-5-5"></a>

このセクションでは、Apache Maven を使用して Java コードをコンパイルし、JAR ファイルにパッケージ化します。Maven コマンドラインツールまたは IDE を使用して、コードをコンパイルおよびパッケージ化できます。

**Maven コマンドラインを使用してコンパイルおよびパッケージ化する方法**

Java GettingStarted プロジェクトを含むディレクトリに移動し、次のコマンドを実行します。

```
$ mvn package
```

**IDE を使用してコンパイルおよびパッケージ化する方法**

IDE Maven 統合から `mvn package` を実行します。

どちらの場合でも、次の JAR ファイルが作成されます: `target/amazon-msf-java-stream-app-1.0.jar`。

**注記**  
 IDE から「ビルドプロジェクト」を実行しても、JAR ファイルが作成されない場合があります。

## アプリケーションコードの JAR ファイルをアップロードする
<a name="get-started-exercise-6"></a>

このセクションでは、前のセクションで作成した JAR ファイルを、このチュートリアルの冒頭で作成した Amazon Simple Storage Service (Amazon S3) バケットにアップロードします。このステップを完了していない場合、「(リンク)」を参照してください。

**アプリケーションコードの JAR ファイルをアップロードする方法**

1. Amazon S3 コンソール ([https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)) を開きます。

1. アプリケーションコード用に以前作成したバケットを選択します。

1. **アップロード**を選択します。

1. **ファイルの追加**を選択します。

1. 前のステップで生成された JAR ファイルに移動します (`target/amazon-msf-java-stream-app-1.0.jar`)。

1. 他の設定を変更せずに **[アップロード]** を選択します。

**警告**  
`<repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar` で正しい JAR ファイルを選択していることを確認してください。  
`target` ディレクトリには、アップロードする必要のない他の JAR ファイルも含まれています。

## Managed Service for Apache Flink アプリケーションを作成して設定する
<a name="get-started-exercise-7"></a>

コンソールまたは AWS CLIのいずれかを使用してManaged Service for Apache Flink を作成し、実行することができます。このチュートリアルでは、コンソールを使用します。

**注記**  
コンソールを使用してアプリケーションを作成すると、 AWS Identity and Access Management (IAM) リソースと Amazon CloudWatch Logs リソースが自動的に作成されます。を使用してアプリケーションを作成するときは AWS CLI、これらのリソースを個別に作成します。

**Topics**
+ [アプリケーションの作成](#get-started-exercise-7-console-create)
+ [IAM ポリシーを編集する](#get-started-exercise-7-console-iam)
+ [アプリケーションを設定する](#get-started-exercise-7-console-configure)
+ [アプリケーションを実行する](#get-started-exercise-7-console-run)
+ [実行中のアプリケーションのメトリクスを観察する](#get-started-exercise-7-console-stop)
+ [Kinesis ストリームの出力データを観察する](#get-started-exercise-7-console-output)
+ [アプリケーションを停止する](#get-started-exercise-stop)

### アプリケーションの作成
<a name="get-started-exercise-7-console-create"></a>

**アプリケーションを作成するには**

1. にサインインし AWS マネジメントコンソール、https://console.aws.amazon.com/flink で Amazon MSF コンソールを開きます。

1. 正しいリージョンが選択されていることを確認してください: us-east-1 米国東部 (バージニア北部)

1. 右側のメニューを開いて **[Apache Flink アプリケーション]** を選択し、**[ストリーミングアプリケーションの作成]** を選択します。または、最初のページの入門コンテナで **[ストリーミングアプリケーションの作成]** を選択します。

1. **[ストリーミングアプリケーションの作成]** ページで、次の操作を行います。
   + **[ストリーム処理アプリケーションの設定方法の選択]** で **[最初から作成]** を選択します。
   + **[Apache Flink の設定、Application Flink バージョン]** で **[Apache Flink 1.20]** を選択します。

1. アプリケーションを設定する
   + **[アプリケーション名]** に「**MyApplication**」と入力します。
   + **[説明]** に「**My java test app**」と入力します。
   + **[アプリケーションリソースへのアクセス]** で **[必要なポリシーを使用して IAM ロール `kinesis-analytics-MyApplication-us-east-1` を作成/更新]** を選択します。

1. **[アプリケーション設定用テンプレート]** を設定します
   + **[テンプレート]** で **[開発]** を選択します。

1. ページの下部にある **[ストリーミングアプリケーションの作成]** を選択します。

**注記**  
コンソールを使用して Apache Flink アプリケーション用 Managed Service を作成する場合は、IAM ロールとポリシーをアプリケーションが自動的に作成するオプションを選択できます。アプリケーションではこのロールとポリシーを使用して、依存リソースにアクセスします。これらの IAM リソースは、次のようにアプリケーション名とリージョンを使用して命名されます。  
ポリシー: `kinesis-analytics-service-MyApplication-us-east-1`
ロール: `kinesisanalytics-MyApplication-us-east-1`
Amazon Managed Service for Apache Flink は、以前は Kinesis Data Analytics と呼ばれていました。自動的に生成されるリソースの名前には、下位互換性のために「`kinesis-analytics-`」のプレフィックスが付きます。

### IAM ポリシーを編集する
<a name="get-started-exercise-7-console-iam"></a>

IAM ポリシーを編集し、Kinesis Data Streamsにアクセスするための許可を追加します。

**ポリシーを編集する方法**

1. IAM コンソール ([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)) を開きます。

1. **[ポリシー]** を選択します。前のセクションでコンソールによって作成された **`kinesis-analytics-service-MyApplication-us-east-1`** ポリシーを選択します。

1. **[編集]** を選択して、**[JSON]** タブを選択します。

1. 次のポリシー例で強調表示されているセクションをポリシーに追加します。サンプルのアカウント ID (*012345678901*) を自分のアカウント ID に置き換えます。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

1.  ページの下部にある **[次へ]** を選択して、**[変更を保存]** を選択します。

### アプリケーションを設定する
<a name="get-started-exercise-7-console-configure"></a>

アプリケーション設定を編集して、アプリケーションコードアーティファクトを設定します。

**設定を編集する方法**

1. [**MyApplication**] ページで、[**Congirue**] を選択します。

1. **[アプリケーションコードの場所]** セクションで、次の操作を行います。
   + **[Amazon S3 バケット]** には、アプリケーションコード用に以前作成したバケットを選択します。**[参照]** を選択して正しいバケットを選択したら、**[選択]** を選択します。バケット名はクリックしないでください。
   + [**Amazon S3 オブジェクトへのパス**] で、**amazon-msf-java-stream-app-1.0.jar**と入力します。

1. **[アクセス許可]** には、**[必要なポリシーで IAM ロール `kinesis-analytics-MyApplication-us-east-1` を作成/更新]** を選択します。

1. **[ランタイムプロパティ]** セクションで、次のプロパティを追加します。

1. **[新しい項目の追加]** を選択して、次のパラメータをすべて追加します。    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/get-started-exercise.html)

1. 他のセクションは変更しないでください。

1. **[Save changes]** (変更の保存) をクリックします。

**注記**  
Amazon CloudWatch ログ記録を有効にすることを選択すると、ロググループとログストリームが Kinesis Data Analytics によって作成されます。これらのリソースの名前は次のとおりです。  
ロググループ: `/aws/kinesis-analytics/MyApplication`
ログストリーム: `kinesis-analytics-log-stream`

### アプリケーションを実行する
<a name="get-started-exercise-7-console-run"></a>

これでアプリケーションが設定され、実行する準備が整いました。

**アプリケーションを実行するには**

1. Amazon Managed Service for Apache Flink のコンソールで、**[自分のアプリケーション]** を選択して **[実行]** を選択します。

1. 次のページのアプリケーション復元設定ページで、**[最新のスナップショットで実行]** を選択したら、**[実行]** を選択します。

   **[アプリケーション詳細]** の **[ステータス]** は「`Ready`」から「`Starting`」に移行し、アプリケーションが起動されると「`Running`」に移行します。

アプリケーションが「`Running`」ステータスのとき、Flink ダッシュボードを開けるようになります。

**ダッシュボードを開くには**

1. **[Open Apache Flink ダッシュボード]** を選択します。ダッシュボードは新しいページで開かれます。

1. **[実行中のジョブ]** リストで、表示されている 1 つのジョブを選択します。
**注記**  
ランタイムプロパティを設定したり、IAM ポリシーを誤って編集したりすると、アプリケーションのステータスが「`Running`」になることがありますが、Flink ダッシュボードではジョブが継続的に再起動されていることが表示されます。これは、アプリケーションの設定が間違っているか、外部リソースへのアクセス許可がない場合の一般的な障害シナリオです。  
これが発生した場合、Flink ダッシュボードの **[例外]** タブを見て、問題の原因を確認してください。

### 実行中のアプリケーションのメトリクスを観察する
<a name="get-started-exercise-7-console-stop"></a>

**[MyApplication]** ページの **[Amazon CloudWatch メトリクス]** セクションで、実行中のアプリケーションの基本的なメトリクスの一部を確認できます。

**メトリクスを表示する方法**

1. **[更新]** ボタンの横にあるドロップダウンリストから **[10 秒]** を選択します。

1. アプリケーションが実行中で正常なとき、**[アップタイム]** メトリクスが継続的に増加していることを確認できます。

1. **[完全再起動]** メトリクスはゼロである必要があります。増加している場合、設定に問題がある可能性があります。問題を調査するには、Flink ダッシュボードの **[例外]** タブを確認してください。

1. 正常なアプリケーションでは、**[失敗したチェックポイント数]** メトリクスは 0 です。
**注記**  
このダッシュボードには、5 分の粒度で一定の一連のメトリクスが表示されます。CloudWatch ダッシュボードで任意のメトリクスを使用してカスタムアプリケーションのダッシュボードを作成できます。

### Kinesis ストリームの出力データを観察する
<a name="get-started-exercise-7-console-output"></a>

Python スクリプトまたは Kinesis Data Generator のいずれかを使用して、入力にデータを引き続き発行していることを確認してください。

「[https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/)」のデータビューワーを使用 (以前に行った内容と同様に) することで、Managed Service for Apache Flink で実行されているアプリケーションの出力を観察できるようになりました。

**出力の表示方法**

1. Kinesis コンソール ([https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)) を開きます。

1. リージョンが、このチュートリアルの実行に使用しているリージョンと同じであることを確認してください。デフォルトでは、us-east-1 米国東部 (バージニア北部) です。必要に応じてリージョンを変更します。

1. **[データストリーム]** を選択します。

1. 観察するストリームを選択します。このチュートリアルでは、`ExampleOutputStream` を使用します。

1.  **[データビューワー]** タブを選択します。

1. 任意の **[シャード]** を選択し、**[最新]** を **[開始位置]** のままにして **[レコードの取得]** を選択します。「このリクエストのレコードが見つかりません」というエラーが表示される場合があります。この場合、**[レコードの取得を再試行]** を選択します。ストリームディスプレイに発行された最新レコード。

1. データ列の値を選択して、レコードの内容を JSON 形式で確認します。

### アプリケーションを停止する
<a name="get-started-exercise-stop"></a>

アプリケーションを停止するには、`MyApplication` という名前の Managed Service for Apache Flink アプリケーションのコンソールページに移動します。

**アプリケーションを停止するには**

1. **[アクション]** ドロップダウンリストで、**[停止]** を選択します。

1. **[アプリケーション詳細]** の **[ステータス]** は「`Running`」から「`Stopping`」に移行し、アプリケーションが完全に停止すると「`Ready`」に移行します。
**注記**  
Python スクリプトまたは Kinesis Data Generator から入力ストリームへのデータ送信も必ず停止してください。

## 次のステップ
<a name="get-started-exercise-next-step-4"></a>

[AWS リソースをクリーンアップする](getting-started-cleanup.md)