

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon Managed Service for Apache Flink (DataStream API) の概要
<a name="getting-started"></a>

このセクションでは、Managed Service for Apache Flink に関する基本概念と DataStream API を使用した Java でのアプリケーションの実装について紹介します。アプリケーションの作成とテストに使用できるオプションについて説明します。また、このガイドのチュートリアルを完了し、初めてアプリケーションを作成するのに必要なツールのインストール方法についても説明します。

**Topics**
+ [

## Managed Service for Apache Flink アプリケーションのコンポーネントを確認する
](#getting-started-components)
+ [

## 演習を完了するための前提条件を満たす
](#setting-up-prerequisites)
+ [

# AWS アカウントをセットアップし、管理者ユーザーを作成する
](setting-up.md)
+ [

# AWS Command Line Interface (AWS CLI) のセットアップ
](setup-awscli.md)
+ [

# Apache Flink アプリケーション用 Managed Serviceを作成して実行する
](get-started-exercise.md)
+ [

# AWS リソースをクリーンアップする
](getting-started-cleanup.md)
+ [

# 他のリソースを調べる
](getting-started-next-steps.md)

## Managed Service for Apache Flink アプリケーションのコンポーネントを確認する
<a name="getting-started-components"></a>

**注記**  
Amazon Managed Service for Apache Flink はすべての Apache Flink API をサポートし、場合によってはすべての JVM 言語もサポートします。詳細については、「[Flink's APIs](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/concepts/overview/#flinks-apis)」を参照してください。  
選択した API に応じて、アプリケーションの構造および実装が若干異なります。この入門チュートリアルでは、Java における DataStream API を使用したアプリケーションの実装について説明します。

データを処理するため、Managed Service for Apache Flink アプリケーションでは、Apache Flink ランタイムを使用して入力を処理し、出力を生成する Java アプリケーションが使用されます。

通常の Managed Service for Apache Flink アプリケーションには、次のコンポーネントがあります。
+ **ランタイムプロパティ:** *ランタイムプロパティ*を使用して、コードを変更して再発行することなく、設定パラメータをアプリケーションに渡して変更することができます。
+ **ソース:** アプリケーションは 1 つ以上の*ソース*からデータを消費します。ソースは[コネクタ](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/table/overview/)を使用して、Kinesis データストリームや Kafka バケットなどの外部システムからデータを読み込みます。詳細については、「[ストリーミングデータソースを追加する](how-sources.md)」を参照してください。
+ **オペレーター:** アプリケーションは 1 つ以上の *オペレーター* を使用してデータを処理します。オペレータはデータを変換、強化、または集約できます。詳細については、「[オペレータ](how-operators.md)」を参照してください。
+ **シンク:** アプリケーションにより、*シンク*を通じて外部ソースにデータが送信されます。シンクは[コネクタ](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/table/overview/) v を使用して、Kinesis データストリーム、Kafka トピック、Amazon S3、リレーショナルデータベースにデータを送信します。特別なコネクタを使用して、開発目的としてのみ出力データを出力することもできます。詳細については、「[シンクを使用してデータを書き込む](how-sinks.md)」を参照してください。

アプリケーションには、アプリケーションが使用する Flink コネクタ (場合によっては Java ライブラリ) などの*外部依存関係*が一部必要です。Amazon Managed Service for Apache Flink で実行するには、アプリケーションは *fat-jar* で依存関係と一緒にパッケージ化され、Amazon S3 バケットにアップロードされる必要があります。次に、Apache Flink アプリケーション用 Managed Serviceを作成します。コードパッケージの場所は、他のランタイム設定パラメータと一緒に渡します。

このチュートリアルでは、Apache Maven を使用してアプリケーションをパッケージ化する方法、ならびに選択した IDE でアプリケーションをローカルで実行する方法について説明します。

## 演習を完了するための前提条件を満たす
<a name="setting-up-prerequisites"></a>

このガイドの手順を完了するには、以下が必要です。
+ [Git クライアント](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)。Git クライアントをまだインストールしていない場合、インストールします。
+ [ Java Development Kit (JDK) バージョン 11。 ](https://www.oracle.com/java/technologies/downloads/#java11)Java JDK 11 をインストールして、`JAVA_HOME` 環境変数が JDK のインストール場所を指すように設定します。JDK 11 がない場合、[Amazon Coretto 11](https://docs.aws.amazon.com/corretto/latest/corretto-11-ug/what-is-corretto-11.html) または他の任意の標準 JDK を使用できます。
  + JDK が正しくインストールされていることを確認するには、次のコマンドを実行します。Amazon Corretto 以外の JDK を使用している場合、出力は異なります。バージョンが 11.x であることを確認してください。

    ```
    $ java --version
    
    openjdk 11.0.23 2024-04-16 LTS
    OpenJDK Runtime Environment Corretto-11.0.23.9.1 (build 11.0.23+9-LTS)
    OpenJDK 64-Bit Server VM Corretto-11.0.23.9.1 (build 11.0.23+9-LTS, mixed mode)
    ```
+ [Apache Maven](https://maven.apache.org/)。Apache Maven をまだインストールしていない場合、インストールします。インストール方法の詳細については、「[Installing Apache Maven](https://maven.apache.org/install.html)」を参照してください。
  + Apache Maven のインストールをテストするには、次のように入力します。

  ```
  $ mvn -version
  ```
+ ローカル開発用の IDE。開発環境 ([Eclipse Java Neon](https://www.eclipse.org/downloads/packages/release/neon/3) や [IntelliJ IDEA](https://www.jetbrains.com/idea/) など) を使用し、アプリケーションを開発してコンパイルすることをお勧めします。
  + Apache Maven のインストールをテストするには、次のように入力します。

  ```
  $ mvn -version
  ```

開始するには、[AWS アカウントをセットアップし、管理者ユーザーを作成する](setting-up.md)に進みます。

# AWS アカウントをセットアップし、管理者ユーザーを作成する
<a name="setting-up"></a>

Managed Service for Apache Flink を初めて使用する前に、以下のタスクを完了してください：

## にサインアップする AWS アカウント
<a name="sign-up-for-aws"></a>

がない場合は AWS アカウント、次の手順を実行して作成します。

**にサインアップするには AWS アカウント**

1. [https://portal.aws.amazon.com/billing/signup](https://portal.aws.amazon.com/billing/signup) を開きます。

1. オンラインの手順に従います。

   サインアップ手順の一環として、電話またはテキストメッセージを受け取り、電話キーパッドで検証コードを入力します。

   にサインアップすると AWS アカウント、 *AWS アカウントのルートユーザー* が作成されます。ルートユーザーには、アカウントのすべての AWS のサービス とリソースへのアクセス権があります。セキュリティベストプラクティスとして、ユーザーに管理アクセス権を割り当て、[ルートユーザーアクセスが必要なタスク](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_root-user.html#root-user-tasks)の実行にはルートユーザーのみを使用するようにしてください。

AWS サインアッププロセスが完了すると、 から確認メールが送信されます。[https://aws.amazon.com/](https://aws.amazon.com/) の **[マイアカウント]** をクリックして、いつでもアカウントの現在のアクティビティを表示し、アカウントを管理することができます。

## 管理アクセスを持つユーザーを作成する
<a name="create-an-admin"></a>

にサインアップしたら AWS アカウント、日常的なタスクにルートユーザーを使用しないように AWS アカウントのルートユーザー、 のセキュリティを確保し AWS IAM アイデンティティセンター、 を有効にして管理ユーザーを作成します。

**を保護する AWS アカウントのルートユーザー**

1.  **ルートユーザー**を選択し、 AWS アカウント E メールアドレスを入力して、アカウント所有者[AWS マネジメントコンソール](https://console.aws.amazon.com/)として にサインインします。次のページでパスワードを入力します。

   ルートユーザーを使用してサインインする方法については、「*AWS サインイン ユーザーガイド*」の「[ルートユーザーとしてサインインする](https://docs.aws.amazon.com/signin/latest/userguide/console-sign-in-tutorials.html#introduction-to-root-user-sign-in-tutorial)」を参照してください。

1. ルートユーザーの多要素認証 (MFA) を有効にします。

   手順については、*IAM* [ユーザーガイドの AWS アカウント 「ルートユーザー (コンソール) の仮想 MFA デバイス](https://docs.aws.amazon.com/IAM/latest/UserGuide/enable-virt-mfa-for-root.html)を有効にする」を参照してください。

**管理アクセスを持つユーザーを作成する**

1. IAM アイデンティティセンターを有効にします。

   手順については、「*AWS IAM アイデンティティセンター ユーザーガイド*」の「[AWS IAM アイデンティティセンターの有効化](https://docs.aws.amazon.com//singlesignon/latest/userguide/get-set-up-for-idc.html)」を参照してください。

1. IAM アイデンティティセンターで、ユーザーに管理アクセスを付与します。

   を ID ソース IAM アイデンティティセンターディレクトリ として使用する方法のチュートリアルについては、「 *AWS IAM アイデンティティセンター ユーザーガイド*」の[「デフォルトを使用してユーザーアクセスを設定する IAM アイデンティティセンターディレクトリ](https://docs.aws.amazon.com//singlesignon/latest/userguide/quick-start-default-idc.html)」を参照してください。

**管理アクセス権を持つユーザーとしてサインインする**
+ IAM アイデンティティセンターのユーザーとしてサインインするには、IAM アイデンティティセンターのユーザーの作成時に E メールアドレスに送信されたサインイン URL を使用します。

  IAM Identity Center ユーザーを使用してサインインする方法については、*AWS サインイン 「 ユーザーガイド*[」の AWS 「 アクセスポータルにサインイン](https://docs.aws.amazon.com/signin/latest/userguide/iam-id-center-sign-in-tutorial.html)する」を参照してください。

**追加のユーザーにアクセス権を割り当てる**

1. IAM アイデンティティセンターで、最小特権のアクセス許可を適用するというベストプラクティスに従ったアクセス許可セットを作成します。

   手順については、「*AWS IAM アイデンティティセンター ユーザーガイド*」の「[アクセス許可セットを作成する](https://docs.aws.amazon.com//singlesignon/latest/userguide/get-started-create-a-permission-set.html)」を参照してください。

1. グループにユーザーを割り当て、そのグループにシングルサインオンアクセス権を割り当てます。

   手順については、「*AWS IAM アイデンティティセンター ユーザーガイド*」の「[グループを追加する](https://docs.aws.amazon.com//singlesignon/latest/userguide/addgroups.html)」を参照してください。

## プログラム的なアクセス権を付与する
<a name="setting-up-access"></a>

ユーザーが の AWS 外部とやり取りする場合は、プログラムによるアクセスが必要です AWS マネジメントコンソール。プログラムによるアクセスを許可する方法は、 がアクセスするユーザーのタイプによって異なります AWS。

ユーザーにプログラムによるアクセス権を付与するには、以下のいずれかのオプションを選択します。


****  

| プログラムによるアクセス権を必要とするユーザー | 目的 | 方法 | 
| --- | --- | --- | 
| IAM | (推奨) コンソール認証情報を一時的な認証情報として使用して AWS CLI、、 AWS SDKs、または AWS APIs。 |  使用するインターフェイスの指示に従ってください。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/setting-up.html)  | 
|  ワークフォースアイデンティティ (IAM アイデンティティセンターで管理されているユーザー)  | 一時的な認証情報を使用して AWS CLI、、 AWS SDKs、または AWS APIs。 |  使用するインターフェイスの指示に従ってください。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/setting-up.html)  | 
| IAM | 一時的な認証情報を使用して AWS CLI、、 AWS SDKs、または AWS APIs。 | 「IAM [ユーザーガイド」の「 AWS リソースでの一時的な認証情報](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_use-resources.html)の使用」の手順に従います。 | 
| IAM | (非推奨)長期認証情報を使用して、 AWS CLI、 AWS SDKs、または AWS APIs。 |  使用するインターフェイスの指示に従ってください。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/setting-up.html)  | 

## 次のステップ
<a name="setting-up-next-step-2"></a>

[AWS Command Line Interface (AWS CLI) のセットアップ](setup-awscli.md)

# AWS Command Line Interface (AWS CLI) のセットアップ
<a name="setup-awscli"></a>

このステップでは、 Managed Service for Apache Flink で使用する をダウンロードして設定 AWS CLI します。

**注記**  
このガイドの使用開始実習では、操作を実行するために、アカウントの管理者の認証情報 (`adminuser`) を使用していることが前提となっています。

**注記**  
が既に AWS CLI インストールされている場合は、アップグレードして最新の機能を取得する必要がある場合があります。詳細については、「*AWS Command Line Interface ユーザーガイド*」の「[Installing the AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/installing.html)」を参照してください。のバージョンを確認するには AWS CLI、次のコマンドを実行します。  

```
aws --version
```
このチュートリアルの演習では、次の AWS CLI バージョン 以降が必要です。  

```
aws-cli/1.16.63
```

**を設定するには AWS CLI**

1.  AWS CLIをダウンロードして設定します。手順については、*「AWS Command Line Interface ユーザーガイド」*の次のトピックを参照してください。
   + [AWS Command Line Interfaceのインストール](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-set-up.html)
   + [AWS CLIの設定](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html)

1. 管理者ユーザーの名前付きプロファイルを `config` ファイルに追加します AWS CLI 。 AWS CLI コマンドを実行するときに、このプロファイルを使用します。名前付きプロファイルの詳細については、*AWS Command Line Interface ユーザーガイド*の[名前付きプロファイル](https://docs.aws.amazon.com/cli/latest/userguide/cli-multiple-profiles.html)を参照してください。

   ```
   [profile adminuser]
   aws_access_key_id = adminuser access key ID
   aws_secret_access_key = adminuser secret access key
   region = aws-region
   ```

   使用可能な AWS リージョンのリストについては、の[「リージョンとエンドポイント](https://docs.aws.amazon.com/general/latest/gr/rande.html)」を参照してください*Amazon Web Services 全般のリファレンス*。
**注記**  
このチュートリアルのコード例およびコマンドは、us-east-1 米国東部 (バージニア北部) リージョンを使用します。別の リージョンを使用するには、このチュートリアルのコードとコマンドのリージョンを、使用したいリージョンに変更します。

1. コマンドプロンプトで以下のヘルプコマンドを入力して、セットアップを確認します。

   ```
   aws help
   ```

 AWS アカウントと を設定したら AWS CLI、次の演習を試すことができます。この演習では、サンプルアプリケーションを設定し、end-to-endのセットアップをテストします。

## 次のステップ
<a name="setup-awscli-next-step-3"></a>

[Apache Flink アプリケーション用 Managed Serviceを作成して実行する](get-started-exercise.md)

# Apache Flink アプリケーション用 Managed Serviceを作成して実行する
<a name="get-started-exercise"></a>

このステップでは、Kinesis Data Streams をソースおよびシンクとして使用して、Managed Service for Apache Flink アプリケーションを作成します。

**Topics**
+ [

## 依存リソースを作成する
](#get-started-exercise-0)
+ [

## ローカルの開発環境のセットアップ
](#get-started-exercise-2)
+ [

## Apache Flink Streaming Java Code のダウンロードと検証
](#get-started-exercise-5)
+ [

## 入力ストリームにサンプルレコードを書き込む
](#get-started-exercise-5-4)
+ [

## アプリケーションをローカルで実行する
](#get-started-exercise-5-run)
+ [

## Kinesis ストリームで入出力データを観察する
](#get-started-exercise-input-output)
+ [

## ローカルで実行されているアプリケーションを停止する
](#get-started-exercise-stop)
+ [

## アプリケーションコードをコンパイルしてパッケージ化する
](#get-started-exercise-5-5)
+ [

## アプリケーションコードの JAR ファイルをアップロードする
](#get-started-exercise-6)
+ [

## Managed Service for Apache Flink アプリケーションを作成して設定する
](#get-started-exercise-7)
+ [

## 次のステップ
](#get-started-exercise-next-step-4)

## 依存リソースを作成する
<a name="get-started-exercise-0"></a>

この練習用の Managed Service for Apache Flink を作成する前に、以下の依存リソースを作成します。
+ 入力用と出力用に 2 つの Kinesis Data Streams。
+ アプリケーションのコードを保存する Amazon S3 バケット
**注記**  
このチュートリアルでは、アプリケーションを us-east-1 米国東部 (バージニア北部) リージョンにデプロイすることが前提とされます。別のリージョンを使用する場合、それに応じてすべてのステップを調整します。

### 2 つの Amazon Kinesis Data Streams を作成する
<a name="get-started-exercise-1"></a>

この演習で Apache Flink アプリケーションのマネージドサービスを作成する前に、2 つの Kinesis データストリーム (`ExampleInputStream` と `ExampleOutputStream`) を作成する必要があります。アプリケーションでは、これらのストリームを使用してアプリケーションの送信元と送信先のストリームを選択します。

これらのストリームは、Amazon Kinesis コンソールまたは次の AWS CLI コマンドを使用して作成できます。コンソールの操作方法については、「*Amazon Kinesis Data Streams デベロッパーガイド*」の「[Creating and Updating Data Streams](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)」を参照してください。を使用してストリームを作成するには AWS CLI、次のコマンドを使用して、アプリケーションに使用するリージョンを調整します。

**データストリームを作成するには (AWS CLI)**

1. 最初のストリーム (`ExampleInputStream`) を作成するには、次の Amazon Kinesis `create-stream` AWS CLI コマンドを使用します。

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-east-1 \
   ```

1. アプリケーションが出力の書き込みに使用する 2 つ目のストリームを作成するには、ストリーム名を `ExampleOutputStream` に変更して同じコマンドを実行します。

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-east-1 \
   ```

### アプリケーションコードの Amazon S3 バケットを作成する
<a name="get-started-exercise-1-5"></a>

Amazon S3 バケットは、コンソールを使用して作成できます。コンソールを使用して Amazon S3 バケットの作成の詳細については、「[Amazon S3 ユーザーガイド](https://docs.aws.amazon.com/AmazonS3/latest/userguide/)」の「[Creating a bucket](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html)」を参照してください。ログイン名を追加するなど、グローバルに一意の名前を Amazon S3 バケットに付けます。

**注記**  
 このチュートリアルで使用するリージョン (us-east-1) でバケットを必ず作成してください。

### その他のリソース
<a name="get-started-exercise-1-6"></a>

アプリケーションを作成すると、Managed Service for Apache Flink によって次の Amazon CloudWatch リソースが自動的に作成されます (これらのリソースがまだ存在しない場合)。
+ `/AWS/KinesisAnalytics-java/<my-application>`という名前のロググループ。
+ `kinesis-analytics-log-stream` というログストリーム

## ローカルの開発環境のセットアップ
<a name="get-started-exercise-2"></a>

開発およびデバッグの場合、選択した IDE から直接マシンで Apache Flink アプリケーションを実行できます。Apache Flink の依存関係は、Apache Maven を使用して通常の Java の依存関係のように処理されます。

**注記**  
開発マシンには、Java JDK 11、Maven、Git がインストールされている必要があります。[Eclipse Java Neon](https://www.eclipse.org/downloads/packages/release/neon/3) や [IntelliJ IDEA](https://www.jetbrains.com/idea/) などの開発環境を使用することをお勧めします。すべての前提条件を満たしていることを確認するには、「[演習を完了するための前提条件を満たす](getting-started.md#setting-up-prerequisites)」を参照してください。マシンに Apache Flink クラスターをインストールする必要は**ありません**。

### AWS セッションを認証する
<a name="get-started-exercise-2-5"></a>

アプリケーションは Kinesis Data Streams を使用してデータを発行します。ローカルで実行する場合は、Kinesis データストリームに書き込むアクセス許可を持つ有効な AWS 認証済みセッションが必要です。次のステップに従って、セッションを認証します。

1.  AWS CLI と、有効な認証情報が設定された名前付きプロファイルがない場合は、「」を参照してください[AWS Command Line Interface (AWS CLI) のセットアップ](setup-awscli.md)。

1.  AWS CLI が正しく設定されており、ユーザーが次のテストレコードを発行して Kinesis データストリームに書き込むアクセス許可を持っていることを確認します。

   ```
   $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
   ```

1. IDE に統合するプラグインがある場合は AWS、それを使用して IDE で実行されているアプリケーションに認証情報を渡すことができます。詳細については、「[AWS Toolkit for IntelliJ IDEA](https://aws.amazon.com/intellij/)」および「[AWS Toolkit for Eclipse](https://docs.aws.amazon.com/toolkit-for-eclipse/v1/user-guide/welcome.html)」を参照してください。

## Apache Flink Streaming Java Code のダウンロードと検証
<a name="get-started-exercise-5"></a>

この例の Java アプリケーションコードは GitHub から入手できます。アプリケーションコードをダウンロードするには、次の操作を行います。

1. 次のコマンドを使用してリモートリポジトリのクローンを作成します。

   ```
   git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
   ```

1. `amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted` ディレクトリに移動します。

### アプリケーションコンポーネントを確認する
<a name="get-started-exercise-5-1"></a>

アプリケーションは `com.amazonaws.services.msf.BasicStreamingJob` クラスで完全に実装されています。`main()` メソッドは、ストリーミングデータを処理して実行するデータフローを定義します。

**注記**  
最適化された開発者エクスペリエンスを IDE で開発するため、アプリケーションは Amazon Managed Service for Apache Flink とローカルの両方でコードを変更せずに実行されるように設計されています。
+ Amazon Managed Service for Apache Flink および IDE で実行するときに動作するようにランタイム設定を読み取るため、アプリケーションは IDE でローカルにスタンドアロンとして実行されているかどうか自動的に検出します。この場合、アプリケーションはランタイム設定を異なる方法で読み込みます。

  1. アプリケーションが IDE でスタンドアロンモードで実行されていることが検出されたら、プロジェクトの **[リソース]** フォルダに含まれている `application_properties.json` ファイルを作成します。ファイルの内容は次のようになります。

  1. アプリケーションが Amazon Managed Service for Apache Flink で実行されると、デフォルトの動作により、Amazon Managed Service for Apache Flink アプリケーションで定義するランタイムプロパティからアプリケーション設定が読み込まれます。「[Managed Service for Apache Flink アプリケーションを作成して設定する](#get-started-exercise-7)」を参照してください。

     ```
     private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
         if (env instanceof LocalStreamEnvironment) {
             LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE);
             return KinesisAnalyticsRuntime.getApplicationProperties(
                     BasicStreamingJob.class.getClassLoader()
                             .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath());
         } else {
             LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink");
             return KinesisAnalyticsRuntime.getApplicationProperties();
         }
     }
     ```
+ `main()` メソッドにより、アプリケーションのデータフローが定義されて実行されます。
  + デフォルトのストリーミング環境を初期化します。この例では、DataSteam API および `StreamTableEnvironment` で使用される両方の `StreamExecutionEnvironment` を作成し、SQL および Table API と使用する方法が示されます。2 つの環境オブジェクトは、異なる API を使用するための、同じランタイム環境への 2 つの別々のリファレンスです。

    ```
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    ```
  + アプリケーション設定パラメータを読み込みます。アプリケーションが実行されている場所に応じて、正しい場所から自動的にロードします。

    ```
    Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    ```
  + アプリケーションは [Kinesis Consumer](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/#kinesis-consumer) コネクタを使用してソースを定義して、入力ストリームからデータを読み取ります。入力ストリームの設定は、`PropertyGroupId`=`InputStream0` で定義されています。ストリームの名前およびリージョンは、それぞれ `stream.name` および `aws.region` という名前のプロパティにあります。簡素化するため、このソースは文字列としてレコードを読み取ります。

    ```
    private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) {
        String inputStreamName = inputProperties.getProperty("stream.name");
        return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties);
    }
    ...
    
    public static void main(String[] args) throws Exception { 
       ...
       SourceFunction<String> source = createSource(applicationParameters.get("InputStream0"));
       DataStream<String> input = env.addSource(source, "Kinesis Source");  
       ...
    }
    ```
  + 次に、アプリケーションは [Kinesis Streams Sink](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/#kinesis-streams-sink) コネクタを使用してシンクを定義し、出力ストリームにデータを送信します。出力ストリーム名およびリージョンは入力ストリームと同様に `PropertyGroupId`=`OutputStream0` で定義されます。シンクは、ソースからデータを取得している内部 `DataStream` に直接接続されています。実際のアプリケーションでは、ソースとシンクの間に何らかの変換があります。

    ```
    private static KinesisStreamsSink<String> createSink(Properties outputProperties) {
        String outputStreamName = outputProperties.getProperty("stream.name");
        return KinesisStreamsSink.<String>builder()
                .setKinesisClientProperties(outputProperties)
                .setSerializationSchema(new SimpleStringSchema())
                .setStreamName(outputStreamName)
                .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))
                .build();
    }
    ...
    public static void main(String[] args) throws Exception { 
       ...
       Sink<String> sink = createSink(applicationParameters.get("OutputStream0"));
       input.sinkTo(sink);
       ...
    }
    ```
  + 最後に、先ほど定義したデータフローを実行します。これは、データフローに必要なすべてのオペレータを定義した後の、`main()` メソッドの最後の命令である必要があります。

    ```
    env.execute("Flink streaming Java API skeleton");
    ```

### pom.xml ファイルを使用する
<a name="get-started-exercise-5-2"></a>

pom.xml ファイルによってアプリケーションに必要なすべての依存関係が定義され、Flink に必要なすべての依存関係を含む fat-jar を構築するため、Maven Shade プラグインが設定されます。
+ 一部の依存関係には `provided` スコープがあります。これらの依存関係は、アプリケーションが Amazon Managed Service for Apache Flink で実行されると自動的に利用可能となります。アプリケーションをコンパイルしたり、IDE でアプリケーションをローカルで実行したりするために必要です。詳細については、「[アプリケーションをローカルで実行する](#get-started-exercise-5-run)」を参照してください。Amazon Managed Service for Apache Flink で使用するランタイムと同じ Flink バージョンを使用していることを確認してください。

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  ```
+ このアプリケーションで使用される [Kinesis コネクタ](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kinesis/)など、デフォルトのスコープを持つ pom に Apache Flink 依存関係を追加する必要があります。詳細については、「[Apache Flink コネクタを使用する](how-flink-connectors.md)」を参照してください。アプリケーションに必要なその他の Java 依存関係を追加することもできます。

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kinesis</artifactId>
      <version>${aws.connector.version}</version>
  </dependency>
  ```
+ Maven Java Compiler プラグインにより、Java 11 (Apache Flink で現在サポートされている JDK バージョン) に対してコードがコンパイルされていることが確認されます。
+ ランタイムによって提供される一部のライブラリを除き、Maven Shade プラグインによって fat-jar がパッケージ化されます。`ServicesResourceTransformer` および `ManifestResourceTransformer` という 2 つのトランスフォーマーも指定されます。後者は `main` メソッドを含むクラスを設定して、アプリケーションを起動します。メインクラスの名前を変更する場合、このトランスフォーマーを必ず更新してください。
+ 

  ```
  <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      ...
          <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
              <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass>
          </transformer>
      ...
  </plugin>
  ```

## 入力ストリームにサンプルレコードを書き込む
<a name="get-started-exercise-5-4"></a>

このセクションでは、アプリケーションが処理するサンプルレコードをストリームに送信します。サンプルデータを生成するオプションは 2 つあり、Python スクリプトまたは [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator) のいずれかを使用します。

### Python スクリプトを使用してサンプルデータを生成する
<a name="get-started-exercise-5-4-1"></a>

Python スクリプトを使用して、サンプルレコードをストリームに送信できます。

**注記**  
この Python スクリプトを実行するには、Python 3.x を使用して [AWS SDK for Python (Boto)](https://aws.amazon.com/developer/language/python/) ライブラリがインストールされている必要があります。

**Kinesis 入力ストリームにテストデータの送信を開始する方法**

1. [Data Generator GitHub リポジトリ](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/data-generator)から Data Generator `stock.py` Python スクリプトをダウンロードします。

1. `stock.py` スクリプトを実行します。

   ```
   $ python stock.py
   ```

チュートリアルの残りの部分を実践する間、スクリプトを実行し続けてください。Apache Flink アプリケーションを実行できるようになりました。

### Kinesis Data Generator を使用してサンプルデータを生成する
<a name="get-started-exercise-5-4-2"></a>

Python スクリプトを使用する代替手段として、[ホストバージョン](https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html)でも利用可能な [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator) を使用し、ランダムなサンプルデータをストリームに送信できます。Kinesis Data Generator はブラウザで実行されるため、マシンに何もインストールする必要はありません。

**Kinesis Data Generator を設定して実行する方法**

1. 「[Kinesis Data Generator ドキュメント](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)」の指示に従って、ツールへのアクセスを設定します。ユーザーとパスワードを設定する CloudFormation テンプレートを実行します。

1. CloudFormation テンプレートによって生成された URL を介して Kinesis Data Generator にアクセスします。CloudFormation テンプレートが完了したら、**[出力]** タブに URL が表示されます。

1. Data Generator を設定します。
   + **リージョン:** このチュートリアルで使用しているリージョン (us-east-1) を選択します。
   + **ストリーム/配信ストリーム:** アプリケーションが使用する入力ストリーム (`ExampleInputStream`) を選択します。
   + **1 秒あたりのレコード数:** 100
   + **レコードテンプレート:** 次のテンプレートをコピーして貼り付けます。

     ```
     {
       "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}},
       "ticker" : "{{random.arrayElement(
             ["AAPL", "AMZN", "MSFT", "INTC", "TBV"]
         )}}",
       "price" : {{random.number(100)}}          
     }
     ```

1. テンプレートをテストする: **[テンプレートのテスト]** を選択し、生成されたレコードが次の内容と同じであることを確認してください。

   ```
   { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
   ```

1. Data Generator を起動する: **[データ送信の選択]** を選択します。

現在、Kinesis Data Generator は `ExampleInputStream` にデータを送信しています。

## アプリケーションをローカルで実行する
<a name="get-started-exercise-5-run"></a>

Flink アプリケーションを IDE でローカルで実行およびデバッグできます。

**注記**  
続行する前に、入力ストリームと出力ストリームが利用できることを確認してください。「[2 つの Amazon Kinesis Data Streams を作成する](#get-started-exercise-1)」を参照してください。また、両方のストリームから読み書きするアクセス許可があることを確認してください。「[AWS セッションを認証する](#get-started-exercise-2-5)」を参照してください。  
ローカル開発環境をセットアップするには、Java 11 JDK、Apache Maven、Java 開発用の IDE が必要です。必要な前提条件を満たしていることを確認してください。「[演習を完了するための前提条件を満たす](getting-started.md#setting-up-prerequisites)」を参照してください。

### Java プロジェクトを IDE にインポートする
<a name="get-started-exercise-5-run-1"></a>

IDE でアプリケーションの使用を開始するには、Java プロジェクトとしてインポートする必要があります。

クローンしたリポジトリには、複数の例が含まれています。各例は個別のプロジェクトです。このチュートリアルでは、`./java/GettingStarted` サブディレクトリのコンテンツを IDE にインポートします。

Maven を使用して、コードを既存の Java プロジェクトとして挿入します。

**注記**  
新しい Java プロジェクトをインポートする正確なプロセスは、使用している IDE によって異なります。

### ローカルアプリケーション設定を確認する
<a name="get-started-exercise-5-run-2"></a>

ローカルで実行すると、アプリケーションでは `./src/main/resources` のプロジェクトのリソースフォルダにある `application_properties.json` ファイルの設定が使用されます。このファイルを編集して、異なる Kinesis ストリーム名またはリージョンを使用できます。

```
[
  {
    "PropertyGroupId": "InputStream0",
    "PropertyMap": {
      "stream.name": "ExampleInputStream",
      "flink.stream.initpos": "LATEST",
      "aws.region": "us-east-1"
    }
  },
  {
    "PropertyGroupId": "OutputStream0",
    "PropertyMap": {
      "stream.name": "ExampleOutputStream",
      "aws.region": "us-east-1"
    }
  }
]
```

### IDE の実行設定を設定する
<a name="get-started-exercise-5-run-3"></a>

任意の Java アプリケーションを実行する場合と同様に、メインクラス `com.amazonaws.services.msf.BasicStreamingJob` を実行することで、IDE から直接 Flink アプリケーションを実行およびデバッグできます。アプリケーションを実行する前に、実行設定を設定する必要があります。セットアップは使用している IDE によって異なります。例えば、「IntelliJ IDEA ドキュメント」の「[Run/debug configurations](https://www.jetbrains.com/help/idea/run-debug-configuration.html)」を参照してください。特に、次の内容を設定する必要があります。

1. **クラスパスに `provided` 依存関係を追加します**。ローカルで実行するとき、`provided` スコープを持つ依存関係がアプリケーションに渡されることを確認するために必要です。この設定を行わないと、アプリケーションに `class not found` エラーが直ちに表示されます。

1. **Kinesis ストリームにアクセスするための AWS 認証情報をアプリケーションに渡します**。最も手軽な方法は、[AWS Toolkit for IntelliJ IDEA](https://aws.amazon.com/intellij/) を使用することです。実行設定でこの IDE プラグインを使用すると、特定の AWS プロファイルを選択できます。 AWS 認証は、このプロファイルを使用して行われます。 AWS 認証情報を直接渡す必要はありません。

1. IDE が **JDK 11** を使用してアプリケーションを実行することを確認してください。

### IDE でアプリケーションを実行する
<a name="get-started-exercise-5-run-4"></a>

`BasicStreamingJob` の実行設定を設定したら、通常の Java アプリケーションのように実行またはデバッグできます。

**注記**  
コマンドラインから `java -jar ...` を使用して、Maven によって生成される fat-jar を直接実行することはできません。この jar には、アプリケーションをスタンドアロンで実行するために必要な Flink コア依存関係は含まれていません。

アプリケーションが正常に起動すると、スタンドアロンのミニクラスターおよびコネクタの初期化に関する情報の一部がログ記録されます。この後、アプリケーションの起動時に Flink が通常出力する多数の INFO およびいくつかの WARN ログが続きます。

```
13:43:31,405 INFO  com.amazonaws.services.msf.BasicStreamingJob                 [] - Loading application properties from 'flink-application-properties-dev.json'
13:43:31,549 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb.
13:43:31,677 INFO  org.apache.flink.runtime.minicluster.MiniCluster             [] - Starting Flink Mini Cluster
....
```

初期化が完了したら、アプリケーションはそれ以上のログエントリを出力しません。**データが流れている間、ログは出力されません。**

アプリケーションがデータを正しく処理しているかどうか確認するには、次のセクションで記述されているとおり、入出力の Kinesis ストリームを確認することができます。

**注記**  
 フローデータに関するログが出力されないことは、Flink アプリケーションの通常の動作です。すべてのレコードに関するログが出力されるとデバッグに便利ですが、実稼働での実行時にかなりのオーバーヘッドが発生する可能性があります。

## Kinesis ストリームで入出力データを観察する
<a name="get-started-exercise-input-output"></a>

Amazon Kinesis コンソールで **[データビューワー]** を使用することで、(生成サンプル Python) または Amazon Kinesis Data Generator (リンク) によって入力ストリームに送信されるレコードを観察できます。

**レコードを観察する方法**

1. Kinesis コンソール ([https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)) を開きます。

1. リージョンが、このチュートリアルを実行しているリージョンと同じであることを確認してください。デフォルトでは us-east-1 米国東部 (バージニア北部) です。リージョンが一致しない場合は変更します。

1. **[データストリーム]** を選択します。

1. `ExampleInputStream` または `ExampleOutputStream.` のいずれか、観察するストリームを選択します。

1. **[データビューワー]** タブを選択します。

1. 任意の **[シャード]** を選択し、**[最新]** を **[開始位置]** のままにして **[レコードを取得]** を選択します。「このリクエストのレコードが見つかりません」というエラーが表示される場合があります。この場合、**[レコードの取得を再試行]** を選択します。ストリームディスプレイに発行された最新レコード。

1. データ列の値を選択すると、レコードの内容を JSON 形式で確認できます。

## ローカルで実行されているアプリケーションを停止する
<a name="get-started-exercise-stop"></a>

IDE で実行されているアプリケーションを停止します。　 通常、IDE には「停止」オプションがあります。正確な場所および方法は、お使いの IDE によって異なります。

## アプリケーションコードをコンパイルしてパッケージ化する
<a name="get-started-exercise-5-5"></a>

このセクションでは、Apache Maven を使用して Java コードをコンパイルし、JAR ファイルにパッケージ化します。Maven コマンドラインツールまたは IDE を使用して、コードをコンパイルおよびパッケージ化できます。

**Maven コマンドラインを使用してコンパイルおよびパッケージ化する方法**

Java GettingStarted プロジェクトを含むディレクトリに移動し、次のコマンドを実行します。

```
$ mvn package
```

**IDE を使用してコンパイルおよびパッケージ化する方法**

IDE Maven 統合から `mvn package` を実行します。

どちらの場合でも、次の JAR ファイルが作成されます: `target/amazon-msf-java-stream-app-1.0.jar`。

**注記**  
 IDE から「ビルドプロジェクト」を実行しても、JAR ファイルが作成されない場合があります。

## アプリケーションコードの JAR ファイルをアップロードする
<a name="get-started-exercise-6"></a>

このセクションでは、前のセクションで作成した JAR ファイルを、このチュートリアルの冒頭で作成した Amazon Simple Storage Service (Amazon S3) バケットにアップロードします。このステップを完了していない場合、「(リンク)」を参照してください。

**アプリケーションコードの JAR ファイルをアップロードする方法**

1. Amazon S3 コンソール ([https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)) を開きます。

1. アプリケーションコード用に以前作成したバケットを選択します。

1. **アップロード**を選択します。

1. **ファイルの追加**を選択します。

1. 前のステップで生成された JAR ファイルに移動します (`target/amazon-msf-java-stream-app-1.0.jar`)。

1. 他の設定を変更せずに **[アップロード]** を選択します。

**警告**  
`<repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar` で正しい JAR ファイルを選択していることを確認してください。  
`target` ディレクトリには、アップロードする必要のない他の JAR ファイルも含まれています。

## Managed Service for Apache Flink アプリケーションを作成して設定する
<a name="get-started-exercise-7"></a>

コンソールまたは AWS CLIのいずれかを使用してManaged Service for Apache Flink を作成し、実行することができます。このチュートリアルでは、コンソールを使用します。

**注記**  
コンソールを使用してアプリケーションを作成すると、 AWS Identity and Access Management (IAM) リソースと Amazon CloudWatch Logs リソースが自動的に作成されます。を使用してアプリケーションを作成するときは AWS CLI、これらのリソースを個別に作成します。

**Topics**
+ [

### アプリケーションの作成
](#get-started-exercise-7-console-create)
+ [

### IAM ポリシーを編集する
](#get-started-exercise-7-console-iam)
+ [

### アプリケーションを設定する
](#get-started-exercise-7-console-configure)
+ [

### アプリケーションを実行する
](#get-started-exercise-7-console-run)
+ [

### 実行中のアプリケーションのメトリクスを観察する
](#get-started-exercise-7-console-stop)
+ [

### Kinesis ストリームの出力データを観察する
](#get-started-exercise-7-console-output)
+ [

### アプリケーションを停止する
](#get-started-exercise-stop)

### アプリケーションの作成
<a name="get-started-exercise-7-console-create"></a>

**アプリケーションを作成するには**

1. にサインインし AWS マネジメントコンソール、https://console.aws.amazon.com/flink で Amazon MSF コンソールを開きます。

1. 正しいリージョンが選択されていることを確認してください: us-east-1 米国東部 (バージニア北部)

1. 右側のメニューを開いて **[Apache Flink アプリケーション]** を選択し、**[ストリーミングアプリケーションの作成]** を選択します。または、最初のページの入門コンテナで **[ストリーミングアプリケーションの作成]** を選択します。

1. **[ストリーミングアプリケーションの作成]** ページで、次の操作を行います。
   + **[ストリーム処理アプリケーションの設定方法の選択]** で **[最初から作成]** を選択します。
   + **[Apache Flink の設定、Application Flink バージョン]** で **[Apache Flink 1.20]** を選択します。

1. アプリケーションを設定する
   + **[アプリケーション名]** に「**MyApplication**」と入力します。
   + **[説明]** に「**My java test app**」と入力します。
   + **[アプリケーションリソースへのアクセス]** で **[必要なポリシーを使用して IAM ロール `kinesis-analytics-MyApplication-us-east-1` を作成/更新]** を選択します。

1. **[アプリケーション設定用テンプレート]** を設定します
   + **[テンプレート]** で **[開発]** を選択します。

1. ページの下部にある **[ストリーミングアプリケーションの作成]** を選択します。

**注記**  
コンソールを使用して Apache Flink アプリケーション用 Managed Service を作成する場合は、IAM ロールとポリシーをアプリケーションが自動的に作成するオプションを選択できます。アプリケーションではこのロールとポリシーを使用して、依存リソースにアクセスします。これらの IAM リソースは、次のようにアプリケーション名とリージョンを使用して命名されます。  
ポリシー: `kinesis-analytics-service-MyApplication-us-east-1`
ロール: `kinesisanalytics-MyApplication-us-east-1`
Amazon Managed Service for Apache Flink は、以前は Kinesis Data Analytics と呼ばれていました。自動的に生成されるリソースの名前には、下位互換性のために「`kinesis-analytics-`」のプレフィックスが付きます。

### IAM ポリシーを編集する
<a name="get-started-exercise-7-console-iam"></a>

IAM ポリシーを編集し、Kinesis Data Streamsにアクセスするための許可を追加します。

**ポリシーを編集する方法**

1. IAM コンソール ([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)) を開きます。

1. **[ポリシー]** を選択します。前のセクションでコンソールによって作成された **`kinesis-analytics-service-MyApplication-us-east-1`** ポリシーを選択します。

1. **[編集]** を選択して、**[JSON]** タブを選択します。

1. 次のポリシー例で強調表示されているセクションをポリシーに追加します。サンプルのアカウント ID (*012345678901*) を自分のアカウント ID に置き換えます。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

1.  ページの下部にある **[次へ]** を選択して、**[変更を保存]** を選択します。

### アプリケーションを設定する
<a name="get-started-exercise-7-console-configure"></a>

アプリケーション設定を編集して、アプリケーションコードアーティファクトを設定します。

**設定を編集する方法**

1. [**MyApplication**] ページで、[**Congirue**] を選択します。

1. **[アプリケーションコードの場所]** セクションで、次の操作を行います。
   + **[Amazon S3 バケット]** には、アプリケーションコード用に以前作成したバケットを選択します。**[参照]** を選択して正しいバケットを選択したら、**[選択]** を選択します。バケット名はクリックしないでください。
   + [**Amazon S3 オブジェクトへのパス**] で、**amazon-msf-java-stream-app-1.0.jar**と入力します。

1. **[アクセス許可]** には、**[必要なポリシーで IAM ロール `kinesis-analytics-MyApplication-us-east-1` を作成/更新]** を選択します。

1. **[ランタイムプロパティ]** セクションで、次のプロパティを追加します。

1. **[新しい項目の追加]** を選択して、次のパラメータをすべて追加します。    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/get-started-exercise.html)

1. 他のセクションは変更しないでください。

1. **[Save changes]** (変更の保存) をクリックします。

**注記**  
Amazon CloudWatch ログ記録を有効にすることを選択すると、ロググループとログストリームが Kinesis Data Analytics によって作成されます。これらのリソースの名前は次のとおりです。  
ロググループ: `/aws/kinesis-analytics/MyApplication`
ログストリーム: `kinesis-analytics-log-stream`

### アプリケーションを実行する
<a name="get-started-exercise-7-console-run"></a>

これでアプリケーションが設定され、実行する準備が整いました。

**アプリケーションを実行するには**

1. Amazon Managed Service for Apache Flink のコンソールで、**[自分のアプリケーション]** を選択して **[実行]** を選択します。

1. 次のページのアプリケーション復元設定ページで、**[最新のスナップショットで実行]** を選択したら、**[実行]** を選択します。

   **[アプリケーション詳細]** の **[ステータス]** は「`Ready`」から「`Starting`」に移行し、アプリケーションが起動されると「`Running`」に移行します。

アプリケーションが「`Running`」ステータスのとき、Flink ダッシュボードを開けるようになります。

**ダッシュボードを開くには**

1. **[Open Apache Flink ダッシュボード]** を選択します。ダッシュボードは新しいページで開かれます。

1. **[実行中のジョブ]** リストで、表示されている 1 つのジョブを選択します。
**注記**  
ランタイムプロパティを設定したり、IAM ポリシーを誤って編集したりすると、アプリケーションのステータスが「`Running`」になることがありますが、Flink ダッシュボードではジョブが継続的に再起動されていることが表示されます。これは、アプリケーションの設定が間違っているか、外部リソースへのアクセス許可がない場合の一般的な障害シナリオです。  
これが発生した場合、Flink ダッシュボードの **[例外]** タブを見て、問題の原因を確認してください。

### 実行中のアプリケーションのメトリクスを観察する
<a name="get-started-exercise-7-console-stop"></a>

**[MyApplication]** ページの **[Amazon CloudWatch メトリクス]** セクションで、実行中のアプリケーションの基本的なメトリクスの一部を確認できます。

**メトリクスを表示する方法**

1. **[更新]** ボタンの横にあるドロップダウンリストから **[10 秒]** を選択します。

1. アプリケーションが実行中で正常なとき、**[アップタイム]** メトリクスが継続的に増加していることを確認できます。

1. **[完全再起動]** メトリクスはゼロである必要があります。増加している場合、設定に問題がある可能性があります。問題を調査するには、Flink ダッシュボードの **[例外]** タブを確認してください。

1. 正常なアプリケーションでは、**[失敗したチェックポイント数]** メトリクスは 0 です。
**注記**  
このダッシュボードには、5 分の粒度で一定の一連のメトリクスが表示されます。CloudWatch ダッシュボードで任意のメトリクスを使用してカスタムアプリケーションのダッシュボードを作成できます。

### Kinesis ストリームの出力データを観察する
<a name="get-started-exercise-7-console-output"></a>

Python スクリプトまたは Kinesis Data Generator のいずれかを使用して、入力にデータを引き続き発行していることを確認してください。

「[https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/)」のデータビューワーを使用 (以前に行った内容と同様に) することで、Managed Service for Apache Flink で実行されているアプリケーションの出力を観察できるようになりました。

**出力の表示方法**

1. Kinesis コンソール ([https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)) を開きます。

1. リージョンが、このチュートリアルの実行に使用しているリージョンと同じであることを確認してください。デフォルトでは、us-east-1 米国東部 (バージニア北部) です。必要に応じてリージョンを変更します。

1. **[データストリーム]** を選択します。

1. 観察するストリームを選択します。このチュートリアルでは、`ExampleOutputStream` を使用します。

1.  **[データビューワー]** タブを選択します。

1. 任意の **[シャード]** を選択し、**[最新]** を **[開始位置]** のままにして **[レコードの取得]** を選択します。「このリクエストのレコードが見つかりません」というエラーが表示される場合があります。この場合、**[レコードの取得を再試行]** を選択します。ストリームディスプレイに発行された最新レコード。

1. データ列の値を選択して、レコードの内容を JSON 形式で確認します。

### アプリケーションを停止する
<a name="get-started-exercise-stop"></a>

アプリケーションを停止するには、`MyApplication` という名前の Managed Service for Apache Flink アプリケーションのコンソールページに移動します。

**アプリケーションを停止するには**

1. **[アクション]** ドロップダウンリストで、**[停止]** を選択します。

1. **[アプリケーション詳細]** の **[ステータス]** は「`Running`」から「`Stopping`」に移行し、アプリケーションが完全に停止すると「`Ready`」に移行します。
**注記**  
Python スクリプトまたは Kinesis Data Generator から入力ストリームへのデータ送信も必ず停止してください。

## 次のステップ
<a name="get-started-exercise-next-step-4"></a>

[AWS リソースをクリーンアップする](getting-started-cleanup.md)

# AWS リソースをクリーンアップする
<a name="getting-started-cleanup"></a>

このセクションでは、この入門 (DataStream API) チュートリアルで作成した AWS リソースをクリーンアップする手順について説明します。

**Topics**
+ [

## Managed Service for Apache Flink アプリケーションを削除する
](#getting-started-cleanup-app)
+ [

## Kinesis Data Streams を削除する
](#getting-started-cleanup-stream)
+ [

## Amazon S3 オブジェクトとバケットを削除する
](#getting-started-cleanup-s3)
+ [

## IAM リソースを削除する
](#getting-started-cleanup-iam)
+ [

## CloudWatch リソースを削除する
](#getting-started-cleanup-cw)
+ [

## Apache Flink の他のリソースを調べる
](#getting-started-cleanup-next-step-5)

## Managed Service for Apache Flink アプリケーションを削除する
<a name="getting-started-cleanup-app"></a>

アプリケーションを削除するには、次の手順に従います。

1. Kinesis コンソール ([https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)) を開きます。

1. Apache Flink 用 Managed Serviceパネルで、**MyApplication** を選択します。

1. **[アクション]** ドロップダウンリストで **[削除]** を選択したら、削除を確定します。

## Kinesis Data Streams を削除する
<a name="getting-started-cleanup-stream"></a>

1. にサインインし AWS マネジメントコンソール、https://console.aws.amazon.com/flink で Amazon MSF コンソールを開きます。

1. **[データストリーム]** を選択します。

1. 作成した 2 つのストリーム、`ExampleInputStream` および `ExampleOutputStream` を選択します。

1. **[アクション]** ドロップダウンリストで **[削除]** を選択したら、削除を確定します。

## Amazon S3 オブジェクトとバケットを削除する
<a name="getting-started-cleanup-s3"></a>

次の手順に従って Amazon S3 オブジェクトおよびバケットを削除します。

**S3 バケットからオブジェクトを削除する方法**

1. Amazon S3 コンソール ([https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)) を開きます。

1. アプリケーションアーティファクト用に作成した S3 バケットを選択します。

1. アップロードした `amazon-msf-java-stream-app-1.0.jar` という名前のアプリケーションアーティファクトを選択します。

1. **[削除]** を選択し、削除を確定します。

**S3 バケットを削除するには**

1. Amazon S3 コンソール ([https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)) を開きます。

1. アーティファクト用に作成したバケットを選択します。

1. **[削除]** を選択し、削除を確定します。
**注記**  
S3 バケットを削除するには、空である必要があります。

## IAM リソースを削除する
<a name="getting-started-cleanup-iam"></a>

**IAM リソースを削除するには**

1. IAM コンソール ([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)) を開きます。

1. ナビゲーションバーで、[**ポリシー**] を選択します。

1. フィルターコントロールに「**kinesis**」と入力します。

1. **[kinesis-analytics-service-MyApplication-us-east-1]** ポリシーを選択します。

1. [**ポリシーアクション**]、[**削除**] の順に選択します。

1. ナビゲーションバーで ［**ロール**］を選択します。

1. **[kinesis-analytics-MyApplication-us-east-1]** ロールを選択します。

1. **[ロールの削除]** を選択し、削除を確定します。

## CloudWatch リソースを削除する
<a name="getting-started-cleanup-cw"></a>

1. CloudWatch コンソールの [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/) を開いてください。

1. ナビゲーションバーで [**ログ**] を選択します。

1. 「**/aws/kinesis-analytics/MyApplication**」ロググループを選択してください。

1. [**ロググループの削除**]を選択し、削除を確認してください。

## Apache Flink の他のリソースを調べる
<a name="getting-started-cleanup-next-step-5"></a>

[他のリソースを調べる](getting-started-next-steps.md)

# 他のリソースを調べる
<a name="getting-started-next-steps"></a>

Apache Flink 用 Managed Serviceの基本的なアプリケーションを作成して実行したので、より高度な Apache Flink 用 Managed Serviceソリューションについては、以下のリソースを参照してください。
+ **[Amazon Managed Service for Apache Flink ワークショップ](https://catalog.workshops.aws/managed-flink):** このワークショップでは、ストリーミングデータをほぼリアルタイムで取り込み、分析、視覚化するエンドツーエンドのストリーミングアーキテクチャを構築します。あなたは、ニューヨーク市のあるタクシー会社の業務改善に着手しました。ニューヨーク市のタクシー車両のテレメトリデータをほぼリアルタイムで分析して、車両運用を最適化します。
+ 「**[Managed Service for Apache Flink アプリケーションの作成と操作の例](examples-collapsibles.md):**」この開発者ガイドのこのセクションでは、Apache Flink のマネージドサービスでアプリケーションを作成および操作する例を紹介します。これらの例は、 Managed Service for Apache Flink アプリケーションを作成し、結果をテストするために役立つコード例と詳しい手順が含まれます。
+ 「**[Learn Flink: ハンズオントレーニング](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/learn-flink/overview/):**」スケーラブルなストリーミング ETL、分析、イベント駆動型アプリケーションの作成を開始するための Apache Flink の公式入門トレーニングです。