Managed Service for Apache Flink アプリケーションを作成して実行する - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Managed Service for Apache Flink アプリケーションを作成して実行する

このステップでは、ソースとシンクとして Kinesis データストリームを使用して Managed Service for Apache Flink アプリケーションを作成します。

依存リソースを作成する

この練習用の Managed Service for Apache Flink を作成する前に、以下の依存リソースを作成します。

  • 入出力用の 2 つの Kinesis データストリーム

  • アプリケーションのコードを保存する Amazon S3 バケット

    注記

    このチュートリアルでは、us-east-1 米国東部 (バージニア北部) リージョンにアプリケーションをデプロイしていることを前提としています。別のリージョンを使用する場合は、それに応じてすべてのステップを適応させます。

Amazon Kinesis データストリームを 2 つ作成する

この演習で Apache Flink アプリケーションのマネージドサービスを作成する前に、2 つの Kinesis データストリーム (ExampleInputStreamExampleOutputStream) を作成する必要があります。アプリケーションでは、これらのストリームを使用してアプリケーションの送信元と送信先のストリームを選択します。

これらのストリームは、Amazon Kinesis コンソールまたは次の AWS CLI コマンドを使用して作成できます。コンソールでの操作方法については、「Amazon Kinesis Data Streams デベロッパーガイド」 の 「データストリームの作成および更新」 を参照してください。を使用してストリームを作成するには AWS CLI、アプリケーションに使用するリージョンに合わせて、次のコマンドを使用します。

データストリームを作成するには (AWS CLI)
  1. 最初のストリーム (ExampleInputStream) を作成するには、次の Amazon Kinesis create-stream AWS CLI コマンドを使用します。

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \
  2. アプリケーションが出力の書き込みに使用する 2 番目のストリームを作成するには、同じコマンドを実行し、ストリーム名を に変更しますExampleOutputStream

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1 \

アプリケーションコードの Amazon S3 バケットを作成する

Amazon S3 バケットは、コンソールを使用して作成できます。コンソールを使用して Amazon S3 バケットを作成する方法については、「Amazon Amazon S3ユーザーガイド」の「バケットの作成」を参照してください。Amazon S3 バケットには、ログイン名を追加するなど、グローバルに一意の名前で名前を付けます。

注記

このチュートリアルに使用するリージョン (us-east-1) にバケットを作成してください。

その他のリソース

アプリケーションを作成すると、Managed Service for Apache Flink は、次の Amazon CloudWatch リソースがまだ存在しない場合、自動的に作成します。

  • /AWS/KinesisAnalytics-java/<my-application>という名前のロググループ。

  • kinesis-analytics-log-stream というログストリーム

ローカルの開発環境のセットアップ

開発とデバッグのために、IDE選択したマシンで Apache Flink アプリケーションを直接実行できます。Apache Flink の依存関係は、Apache Maven を使用して通常の Java 依存関係のように処理されます。

注記

開発マシンには、Java JDK11、Maven、Git がインストールされている必要があります。Eclipse Java NeonIntelliJ IDEAなどの開発環境を使用することをお勧めします。すべての前提条件を満たしていることを確認するには、「」を参照してください演習を完了するための前提条件を満たす。マシンに Apache Flink クラスターをインストールする必要はありません

セッションを認証する AWS

アプリケーションは Kinesis データストリームを使用してデータを公開します。ローカルで実行する場合は、Kinesis データストリームに書き込むアクセス許可を持つ有効な AWS 認証済みセッションが必要です。セッションを認証するには、次の手順に従います。

  1. AWS CLI と、有効な認証情報が設定されている名前付きプロファイルがない場合は、「」を参照してくださいAWS Command Line Interface (AWS CLI) のセットアップ

  2. AWS CLI が正しく設定されており、次のテストレコードを発行して Kinesis データストリームに書き込むアクセス許可がユーザーに付与されていることを確認します。

    $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
  3. と統合するプラグインIDEが にある場合は AWS、それを使用して、 で実行されているアプリケーションに認証情報を渡すことができますIDE。詳細については、AWS 「 Toolkit for IntelliJIDEA」およびAWS 「 Toolkit for Eclipse 」を参照してください。

Apache Flink ストリーミング Java コードをダウンロードして調べる

この例の Java アプリケーションコードは、 から入手できます GitHub。アプリケーションコードをダウンロードするには、次の操作を行います。

  1. 次のコマンドを使用してリモートリポジトリのクローンを作成します。

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted ディレクトリに移動します。

アプリケーションコンポーネントを確認する

アプリケーションはcom.amazonaws.services.msf.BasicStreamingJobクラスに完全に実装されます。main() メソッドは、ストリーミングデータを処理して実行するデータフローを定義します。

注記

開発者エクスペリエンスを最適化するために、アプリケーションは、Amazon Managed Service for Apache Flink とローカルの両方でコードを変更せずに で実行し、 で開発するように設計されていますIDE。

  • Amazon Managed Service for Apache Flink および で実行したときに動作するようにランタイム設定を読み取るためにIDE、アプリケーションは でローカルでスタンドアロンで実行されているかどうかを自動的に検出しますIDE。この場合、アプリケーションはランタイム設定を次のようにロードします。

    1. アプリケーションで のスタンドアロンモードで実行されていることが検出されたらIDE、プロジェクトのリソースフォルダに含まれるapplication_properties.jsonファイルを形成します。ファイルの内容は次のとおりです。

    2. アプリケーションが Amazon Managed Service for Apache Flink で実行されると、デフォルトの動作により、Amazon Managed Service for Apache Flink アプリケーションで定義するランタイムプロパティからアプリケーション設定がロードされます。「Managed Service for Apache Flink アプリケーションの作成と設定」を参照してください。

      private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
  • main() メソッドは、アプリケーションデータフローを定義し、実行します。

    • デフォルトのストリーミング環境を初期化します。この例では、 で使用する DataSteam APIと、 StreamExecutionEnvironmentで使用する StreamTableEnvironmentSQLと、 テーブル の両方を作成する方法を示しますAPI。2 つの環境オブジェクトは、異なる を使用するように、同じランタイム環境への 2 つの異なる参照ですAPIs。

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    • アプリケーション設定パラメータをロードします。これにより、アプリケーションが実行されている場所に応じて、正しい場所から自動的にロードされます。

      Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    • アプリケーションは、入力ストリームからデータを読み取るために Kinesis Consumer コネクタを使用してソースを定義します。入力ストリームの設定は、 PropertyGroupId= で定義されますInputStream0。ストリームの名前とリージョンは、aws.regionそれぞれ stream.nameおよび という名前のプロパティにあります。簡単にするために、このソースはレコードを文字列として読み取ります。

      private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) { String inputStreamName = inputProperties.getProperty("stream.name"); return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties); } ... public static void main(String[] args) throws Exception { ... SourceFunction<String> source = createSource(applicationParameters.get("InputStream0")); DataStream<String> input = env.addSource(source, "Kinesis Source"); ... }
    • 次に、アプリケーションは Kinesis Streams Sink コネクタを使用してシンクを定義し、出力ストリームにデータを送信します。出力ストリーム名とリージョンはOutputStream0、入力ストリームと同様に PropertyGroupId= で定義されます。シンクは、ソースからデータを取得DataStreamしている内部に直接接続されます。実際のアプリケーションでは、ソースとシンクの間に何らかの変換があります。

      private static KinesisStreamsSink<String> createSink(Properties outputProperties) { String outputStreamName = outputProperties.getProperty("stream.name"); return KinesisStreamsSink.<String>builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputStreamName) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } ... public static void main(String[] args) throws Exception { ... Sink<String> sink = createSink(applicationParameters.get("OutputStream0")); input.sinkTo(sink); ... }
    • 最後に、先ほど定義したデータフローを実行します。これは、データフローが必要とするすべての演算子を定義した後、 main()メソッドの最後の命令である必要があります。

      env.execute("Flink streaming Java API skeleton");

pom.xml ファイルを使用する

pom.xml ファイルは、アプリケーションに必要なすべての依存関係を定義し、Flink に必要なすべての依存関係を含む fat-jar を構築するために Maven Shade プラグインを設定します。

  • 一部の依存関係にはprovidedスコープがあります。これらの依存関係は、アプリケーションが Amazon Managed Service for Apache Flink で実行されるときに自動的に使用できます。アプリケーションをコンパイルしたり、 でアプリケーションをローカルで実行したりするために必要ですIDE。詳細については、「アプリケーションをローカルで実行する」を参照してください。Amazon Managed Service for Apache Flink で使用するランタイムと同じ Flink バージョンを使用していることを確認してください。

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
  • このアプリケーションで使用される Kinesis コネクタなど、デフォルトのスコープを持つ pom に Apache Flink 依存関係を追加する必要があります。詳細については、「Managed Service for Apache Flink で Apache Flink コネクタを使用する」を参照してください。アプリケーションに必要な追加の Java 依存関係を追加することもできます。

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>${aws.connector.version}</version> </dependency>
  • Maven Java Compiler プラグインは、コードが Apache Flink で現在サポートされているJDKバージョンである Java 11 に対してコンパイルされていることを確認します。

  • Maven Shade プラグインは fat-jar をパッケージ化します。ただし、ランタイムによって提供される一部のライブラリは除きます。また、 ServicesResourceTransformerと の 2 つのトランスフォーマーも指定しますManifestResourceTransformer。後者は、 mainメソッドを含むクラスを設定して、アプリケーションを起動します。メインクラスの名前を変更する場合は、このトランスフォーマーの更新を忘れないでください。

  • <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>

サンプルレコードを入力ストリームに書き込む

このセクションでは、アプリケーションが処理するためにサンプルレコードをストリームに送信します。サンプルデータを生成するには、Python スクリプトまたは Kinesis Data Generator を使用して 2 つのオプションがあります。

Python スクリプトを使用してサンプルデータを生成する

Python スクリプトを使用して、サンプルレコードをストリームに送信できます。

注記

この Python スクリプトを実行するには、Python 3.x を使用し、 AWS SDK for Python (Boto) ライブラリがインストールされている必要があります。

Kinesis 入力ストリームへのテストデータの送信を開始するには:

  1. データジェネレータ GitHub リポジトリ からデータジェネレータ stock.py Python スクリプトをダウンロードします。

  2. stock.py スクリプトを実行します。

    $ python stock.py

チュートリアルの残りの部分を完了する間は、スクリプトを実行したままにします。Apache Flink アプリケーションを実行できるようになりました。

Kinesis Data Generator を使用してサンプルデータを生成する

Python スクリプトを使用する代わりに、ホストバージョン でも利用可能な Kinesis Data Generator を使用して、ランダムなサンプルデータをストリームに送信できます。Kinesis Data Generator はブラウザで実行され、マシンに何もインストールする必要はありません。

Kinesis Data Generator をセットアップして実行するには:

  1. Kinesis Data Generator ドキュメントの指示に従って、ツールへのアクセスを設定します。ユーザーとパスワードを設定する AWS CloudFormation テンプレートを実行します。

  2. テンプレートによってURL生成された から Kinesis Data Generator CloudFormationにアクセスします。 CloudFormation テンプレートの完了後、出力タブURLに が表示されます。

  3. データジェネレータを設定します。

    • リージョン: このチュートリアルで使用しているリージョンを選択します: us-east-1

    • ストリーム/配信ストリーム: アプリケーションが使用する入力ストリームを選択します。 ExampleInputStream

    • 1 秒あたりのレコード数: 100

    • レコードテンプレート: 次のテンプレートをコピーして貼り付けます。

      { "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
  4. テンプレートをテストする: テストテンプレートを選択し、生成されたレコードが次のようになっていることを確認します。

    { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
  5. データジェネレーターを起動する: データの送信を選択します

Kinesis Data Generator が にデータを送信していますExampleInputStream

アプリケーションをローカルで実行する

Flink アプリケーションをローカルで で実行およびデバッグできますIDE。

注記

続行する前に、入力ストリームと出力ストリームが使用可能であることを確認します。「Amazon Kinesis データストリームを 2 つ作成する」を参照してください。また、両方のストリームから読み書きするアクセス許可があることを確認します。「セッションを認証する AWS」を参照してください。

ローカル開発環境を設定するにはJDK、Java 11、Apache Maven、および Java 開発IDE用 が必要です。必要な前提条件を満たしていることを確認します。「演習を完了するための前提条件を満たす」を参照してください。

Java プロジェクトを にインポートする IDE

でアプリケーションの使用を開始するにはIDE、Java プロジェクトとしてインポートする必要があります。

クローンしたリポジトリには、複数の例が含まれています。各例は個別のプロジェクトです。このチュートリアルでは、./java/GettingStartedサブディレクトリのコンテンツを にインポートしますIDE。

Maven を使用して、コードを既存の Java プロジェクトとして挿入します。

注記

新しい Java プロジェクトをインポートする正確なプロセスは、IDE使用している によって異なります。

ローカルアプリケーション設定を確認する

ローカルで実行する場合、アプリケーションは のプロジェクトのリソースフォルダにある application_properties.json ファイルの設定を使用します./src/main/resources。このファイルを編集して、異なる Kinesis ストリーム名またはリージョンを使用できます。

[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]

IDE 実行設定をセットアップする

Java アプリケーションを実行する場合と同様にcom.amazonaws.services.msf.BasicStreamingJob、メインクラス を実行するIDEことで、 から直接 Flink アプリケーションを実行およびデバッグできます。アプリケーションを実行する前に、実行設定を設定する必要があります。セットアップはIDE、使用している によって異なります。例えば、IntelliJ IDEAドキュメントの「実行/デバッグ設定」を参照してください。特に、以下を設定する必要があります。

  1. クラスパス にprovided依存関係を追加します。これは、ローカルで実行するときに、providedスコープを持つ依存関係がアプリケーションに渡されることを確認するために必要です。この設定を行わないと、アプリケーションはすぐにclass not foundエラーを表示します。

  2. AWS 認証情報を渡して、Kinesis ストリームにアクセスします。最も速い方法は、 AWS Toolkit for IntelliJ IDEAを使用することです。実行設定でこのIDEプラグインを使用すると、特定の AWS プロファイルを選択できます。 AWS 認証は、このプロファイルを使用して行われます。認証情報を直接渡す AWS 必要はありません。

  3. JDK11 を使用してアプリケーションIDEを実行していることを確認します。

でアプリケーションを実行する IDE

の実行設定をセットアップしたらBasicStreamingJob、通常の Java アプリケーションのように実行またはデバッグできます。

注記

Maven によって生成された fat-jar を コマンドラインjava -jar ...から直接実行することはできません。この jar には、アプリケーションをスタンドアロンで実行するために必要な Flink コア依存関係は含まれていません。

アプリケーションが正常に起動すると、スタンドアロンのミニクラスターとコネクタの初期化に関する情報がログに記録されます。この後、アプリケーションの起動時に Flink が通常出力する INFOと一部のWARNログが続きます。

13:43:31,405 INFO com.amazonaws.services.msf.BasicStreamingJob [] - Loading application properties from 'flink-application-properties-dev.json' 13:43:31,549 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 13:43:31,677 INFO org.apache.flink.runtime.minicluster.MiniCluster [] - Starting Flink Mini Cluster ....

初期化が完了すると、アプリケーションはそれ以上のログエントリを出力しません。データが流れる間は、ログは出力されません。

アプリケーションが正しくデータを処理しているかどうかを確認するには、次のセクションで説明するように、入出力 Kinesis ストリームを検査できます。

注記

フローデータに関するログを出力しないことは、Flink アプリケーションの通常の動作です。すべてのレコードにログを送信するとデバッグに便利ですが、本番稼働時に大幅なオーバーヘッドが発生する可能性があります。

Kinesis ストリームで入出力データを監視する

Amazon Kinesis コンソールの Data Viewer を使用して、 (サンプル Python を生成) または Amazon Kinesis Data Generator (リンク) によって入力ストリームに送信されたレコードを確認できます。

レコードを観察するには
  1. https://console.aws.amazon.com/kinesis で Kinesis コンソールを開きます。

  2. このチュートリアルを実行しているリージョンが同じであることを確認します。デフォルトでは us-east-1 US East (バージニア北部) です。一致しない場合は、リージョンを変更します。

  3. データストリーム を選択します。

  4. 監視するストリームを ExampleInputStreamまたは のいずれかで選択します。 ExampleOutputStream.

  5. データビューワータブを選択します。

  6. 任意のシャード を選択し、最新の を開始位置 として保持し、レコードの取得 を選択します。このリクエストの「レコードが見つかりません」エラーが表示される場合があります。その場合は、レコードの取得を再試行する を選択します。ストリームディスプレイに発行される最新のレコード。

  7. データ列の値を選択して、レコードの内容を JSON 形式で検査します。

ローカルで実行されているアプリケーションを停止する

で実行されているアプリケーションを停止しますIDE。IDE は通常、「停止」オプションを提供します。正確な場所と方法はIDE、使用している によって異なります。

アプリケーションコードをコンパイルしてパッケージ化する

このセクションでは、Apache Maven を使用して Java コードをコンパイルし、JARファイルにパッケージ化します。Maven コマンドラインツールまたは を使用して、コードをコンパイルしてパッケージ化できますIDE。

Maven コマンドラインを使用してコンパイルおよびパッケージ化するには:

Java GettingStarted プロジェクトを含むディレクトリに移動し、次のコマンドを実行します。

$ mvn package

を使用してコンパイルおよびパッケージ化するにはIDE:

IDE Maven 統合mvn packageから を実行します。

どちらの場合も、次のJARファイルが作成されます: target/amazon-msf-java-stream-app-1.0.jar

注記

から「ビルドプロジェクト」を実行しても、JARファイルが作成されないIDE可能性があります。

アプリケーションコードJARファイルをアップロードする

このセクションでは、前のセクションで作成したJARファイルを、このチュートリアルの冒頭で作成した Amazon Simple Storage Service (Amazon S3) バケットにアップロードします。このステップを完了していない場合は、 (リンク) を参照してください。

アプリケーションコードJARファイルをアップロードするには
  1. で Amazon S3 コンソールを開きますhttps://console.aws.amazon.com/s3/

  2. アプリケーションコード用に以前に作成したバケットを選択します。

  3. アップロードを選択します。

  4. ファイルの追加を選択します。

  5. 前のステップで生成されたJARファイルに移動します。 target/amazon-msf-java-stream-app-1.0.jar

  6. 他の設定を変更せずにアップロードを選択します。

警告

で正しいJARファイルを選択してください<repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar

target ディレクトリには、アップロードする必要のない他のJARファイルも含まれています。

Managed Service for Apache Flink アプリケーションの作成と設定

コンソールまたは AWS CLIのいずれかを使用してManaged Service for Apache Flink を作成し、実行することができます。このチュートリアルでは、 コンソールを使用します。

注記

コンソールを使用してアプリケーションを作成すると、 AWS Identity and Access Management (IAM) と Amazon CloudWatch Logs リソースが作成されます。を使用してアプリケーションを作成するときは AWS CLI、これらのリソースを個別に作成します。

アプリケーションの作成

アプリケーションを作成するには
  1. https://console.aws.amazon.com/flink で Managed Service for Apache Flink コンソールを開きます。

  2. 正しいリージョンが選択されていることを確認します: us-east-1 US East (バージニア北部)

  3. 右側のメニューを開き、Apache Flink アプリケーションを選択し、ストリーミングアプリケーションを作成します。または、最初のページのスタートコンテナでストリーミングアプリケーションを作成するを選択します。

  4. ストリーミングアプリケーションの作成ページで、次の操作を行います。

    • ストリーム処理アプリケーションを設定する方法を選択します。「最初から作成」を選択します。

    • Apache Flink 設定、Application Flink バージョン: Apache Flink 1.19 を選択します。

  5. アプリケーションを設定する

    • アプリケーション名: を入力しますMyApplication

    • 説明: と入力しますMy java test app

    • アプリケーションリソースへのアクセス: 必要なポリシーkinesis-analytics-MyApplication-us-east-1を使用してIAMロールを作成/更新する を選択します。

  6. アプリケーション設定用のテンプレートを設定する

    • テンプレート: 開発 を選択します。

  7. ページの下部にあるストリーミングアプリケーションの作成を選択します。

注記

コンソールを使用して Managed Service for Apache Flink アプリケーションを作成する場合、アプリケーション用にIAMロールとポリシーを作成するオプションがあります。アプリケーションではこのロールとポリシーを使用して、依存リソースにアクセスします。これらのIAMリソースには、アプリケーション名とリージョンを使用して次のように名前が付けられます。

  • ポリシー: kinesis-analytics-service-MyApplication-us-east-1

  • ロール: kinesisanalytics-MyApplication-us-east-1

Amazon Managed Service for Apache Flink は、以前は Kinesis Data Analytics と呼ばれていました。自動作成されるリソースの名前は、下位互換性kinesis-analytics-のためにプレフィックスが付けられます。

IAM ポリシーを編集する

IAM ポリシーを編集して、Kinesis データストリームにアクセスするアクセス許可を追加します。

ポリシーを編集するには
  1. でIAMコンソールを開きますhttps://console.aws.amazon.com/iam/

  2. [Policies] (ポリシー) を選択します。前のセクションでコンソールによって作成された kinesis-analytics-service-MyApplication-us-east-1 ポリシーを選択します。

  3. 編集 を選択し、 JSONタブを選択します。

  4. 次のポリシー例で強調表示されているセクションをポリシーに追加します。サンプルアカウントを置き換える IDs (012345678901) とアカウント ID 。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" } ] }
  5. ページの下部にあるへ を選択し、変更の保存 を選択します。

アプリケーションを設定する

アプリケーション設定を編集して、アプリケーションコードアーティファクトを設定します。

設定を編集するには
  1. MyApplication ページで、「 の設定」を選択します。

  2. アプリケーションコードの場所セクションで、次の操作を行います。

    • Amazon S3 バケット では、アプリケーションコード用に以前に作成したバケットを選択します。ブラウズを選択して正しいバケットを選択し、 を選択します。バケット名をクリックしないでください。

    • [Amazon S3 オブジェクトへのパス] で、amazon-msf-java-stream-app-1.0.jarと入力します。

  3. アクセス許可 では、必要なポリシー kinesis-analytics-MyApplication-us-east-1を使用してIAMロールを作成/更新する を選択します。

  4. ランタイムプロパティセクションで、次のプロパティを追加します。

  5. 新しい項目を追加 を選択し、次の各パラメータを追加します。

    グループ ID キー
    InputStream0 stream.name ExampleInputStream
    InputStream0 aws.region us-east-1
    OutputStream0 stream.name ExampleOutputStream
    OutputStream0 aws.region us-east-1
  6. 他のセクションは変更しないでください。

  7. [Save changes] (変更の保存) をクリックします。

注記

Amazon CloudWatch ログ記録を有効にすると、Managed Service for Apache Flink はロググループとログストリームを作成します。これらのリソースの名前は次のとおりです。

  • ロググループ: /aws/kinesis-analytics/MyApplication

  • ログストリーム: kinesis-analytics-log-stream

アプリケーションを実行する

これでアプリケーションが設定され、実行の準備が整いました。

アプリケーションを実行するには
  1. Amazon Managed Service for Apache Flink のコンソールで、My Application を選択し、Run を選択します。

  2. 次のページのアプリケーション復元設定ページで、最新のスナップショットで実行 を選択し、 の実行 を選択します。

    アプリケーションのステータスの詳細が、アプリケーションの開始Running時に から Ready に移行Startingし、その後 に移行します。

アプリケーションが Runningステータスになったら、Flink ダッシュボードを開くことができます。

ダッシュボードを開くには
  1. Open Apache Flink ダッシュボード を選択します。ダッシュボードが新しいページで開きます。

  2. Runing jobs リストで、表示できる 1 つのジョブを選択します。

    注記

    ランタイムプロパティを設定したり、IAMポリシーを誤って編集したりすると、アプリケーションのステータスが になる可能性がありますがRunning、Flink ダッシュボードにはジョブが継続的に再起動していると表示されます。これは、アプリケーションの設定が間違っているか、外部リソースへのアクセス許可がない場合によく発生する障害シナリオです。

    この場合、Flink ダッシュボードの例外タブをチェックして、問題の原因を確認します。

実行中のアプリケーションのメトリクスを観察する

MyApplication このページの Amazon CloudWatch メトリクスセクションには、実行中のアプリケーションから基本的なメトリクスの一部が表示されます。

メトリクスを表示するには
  1. 更新ボタンの横にあるドロップダウンリストから 10 秒を選択します。

  2. アプリケーションが実行中で正常であれば、アップタイムメトリクスが継続的に増加していることを確認できます。

  3. フル再起動メトリクスはゼロである必要があります。増加している場合、設定に問題がある可能性があります。問題を調査するには、Flink ダッシュボードの例外タブを確認します。

  4. 正常なアプリケーションでは、失敗したチェックポイントメトリクスの数はゼロである必要があります。

    注記

    このダッシュボードには、5 分の粒度を持つ固定メトリクスのセットが表示されます。 CloudWatch ダッシュボードに任意のメトリクスを含むカスタムアプリケーションダッシュボードを作成できます。

Kinesis ストリームの出力データを観察する

Python スクリプトまたは Kinesis Data Generator を使用して、入力にデータを発行していることを確認します。

Managed Service for Apache Flink で実行されているアプリケーションの出力を、 のデータビューワーを使用して観察できるようになりました。これはhttps://console.aws.amazon.com/kinesis/、以前に行ったものと同様です。

出力を表示するには
  1. https://console.aws.amazon.com/kinesis で Kinesis コンソールを開きます。

  2. リージョンがこのチュートリアルの実行に使用しているリージョンと同じであることを確認します。デフォルトでは、us-east-1US East (バージニア北部) です。必要に応じてリージョンを変更します。

  3. データストリーム を選択します。

  4. 監視するストリームを選択します。このチュートリアルでは、ExampleOutputStream を使用します。

  5. データビューワータブを選択します。

  6. 任意のシャード を選択し、最新の を開始位置 として保持し、レコードの取得 を選択します。このリクエストの「レコードが見つかりません」エラーが表示される場合があります。その場合は、レコードの取得を再試行する を選択します。ストリーム表示に発行される最新のレコード。

  7. データ列の値を選択して、レコードの内容を JSON 形式で検査します。

アプリケーションの停止

アプリケーションを停止するには、 という名前の Managed Service for Apache Flink アプリケーションのコンソールページに移動しますMyApplication

アプリケーションを停止するには
  1. アクションドロップダウンリストから、停止 を選択します。

  2. アプリケーションのステータスの詳細が から Running に移行しStopping、アプリケーションが完全に停止Readyすると に変わります。

    注記

    Python スクリプトまたは Kinesis Data Generator から入力ストリームへのデータ送信も停止することを忘れないでください。

次のステップ

AWS リソースをクリーンアップする