Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Apache Beam を使用してアプリケーションを作成する
この課題では、「Apache Beam
注記
この演習に必要な前提条件を設定するには、まずチュートリアル: Managed Service for Apache Flink で の使用 DataStream APIを開始する演習を完了してください。
このトピックには、次のセクションが含まれています。
依存リソースを作成する
この練習用の Managed Service for Apache Flink を作成する前に、以下の依存リソースを作成します。
2 つの Kinesis Data Streams (
ExampleInputStream
とExampleOutputStream
)アプリケーションのコードを保存するためのAmazon S3バケット (
ka-app-code-
)<username>
Kinesis ストリームと Amazon S3 バケットは、コンソールを使用して作成できます。これらのリソースの作成手順については、次の各トピックを参照してください。
「Amazon Kinesis Data Streamsデベロッパーガイド」の「データストリームの作成および更新」 データストリーム
ExampleInputStream
とExampleOutputStream
に名前を付けます。Amazon Simple Storage Service ユーザーガイドの「S3 バケットを作成する方法」を参照してください。ログイン名 (
ka-app-code-
など) を追加して、Amazon S3 バケットにグローバルに一意の名前を付けます。<username>
サンプルレコードを入力ストリームに書き込む
このセクションでは、Python スクリプトを使用して、アプリケーションが処理するランダムな文字列をストリームに書き込みます。
注記
このセクションでは AWS SDK for Python (Boto)
-
次の内容で、
ping.py
という名前のファイルを作成します。import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")
-
ping.py
スクリプトを実行します。$ python ping.py
チュートリアルの残りの部分を完了する間、スクリプトを実行し続けてください。
アプリケーションコードをダウンロードして調べる
この例の Java アプリケーションコードは、 から入手できます GitHub。アプリケーションコードをダウンロードするには、次の操作を行います。
Git クライアントをまだインストールしていない場合は、インストールします。詳細については、「Git のインストール
」をご参照ください。 次のコマンドを使用してリモートリポジトリのクローンを作成します。
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
amazon-kinesis-data-analytics-java-examples/Beam
ディレクトリに移動します。
アプリケーションコードはBasicBeamStreamingJob.java
ファイルに含まれています。アプリケーションコードに関して、以下の点に注意してください。
アプリケーションは Apache Beam を使用してParDo
、 というカスタム変換関数を呼び出して受信レコードを処理します PingPongFn
。PingPongFn
関数を呼び出すコードは次のとおりです。.apply("Pong transform", ParDo.of(new PingPongFn())
Apache Beam を使用する Apache Flink アプリケーション用 Managed Serviceには、以下のコンポーネントが必要です。これらのコンポーネントとバージョンを
pom.xml
に含めないと、アプリケーションは環境の依存関係から誤ったバージョンをロードし、バージョンが一致しないため、実行時にアプリケーションがクラッシュします。<jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>
PingPongFn
変換関数は、入力データが ping でない限り、入力データを出力ストリームに渡します。「ping」である場合は、文字列「pong\n」を出力ストリームに出力します。変換関数のコードは以下のとおりです。
private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }
アプリケーションコードをコンパイルする
アプリケーションをコンパイルするには、次の操作を行います。
Java と Maven がまだインストールされていない場合は、インストールします。詳細については、チュートリアル: Managed Service for Apache Flink で の使用 DataStream APIを開始するチュートリアルの「必要な前提条件を完了する」を参照してください。
次のコマンドを使用して、アプリケーションをコンパイルします。
mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
注記
提供されているソースコードは Java 11 のライブラリーに依存しています。
アプリケーションをコンパイルすると、アプリケーションJARファイル () が作成されますtarget/basic-beam-app-1.0.jar
。
Apache Flink ストリーミング Java コードをアップロードする
このセクションでは、依存リソースを作成する のセクションで作成した Amazon S3 バケットにアプリケーションコードをアップロードします。
-
Amazon S3 コンソールで、 ka-app-code- を選択します。
<username>
バケット を選択し、アップロード を選択します。 -
ファイルの選択のステップで、[ファイルを追加] を選択します。前のステップで作成した
basic-beam-app-1.0.jar
ファイルに移動します。 オブジェクトの設定を変更する必要はないので、[アップロード] を選択してください。
アプリケーションコードが Amazon S3 バケットに保存され、アプリケーションからアクセスできるようになります。
Managed Service for Apache Flink アプリケーションを作成して実行する
以下の手順を実行し、コンソールを使用してアプリケーションを作成、設定、更新、および実行します。
アプリケーションの作成
https://console.aws.amazon.com/flink で Managed Service for Apache Flink コンソールを開きます。
-
Managed Service for Apache Flinkのダッシュボードで、「分析アプリケーションの作成」 を選択します。
-
「Managed Service for Apache Flink-アプリケーションの作成」ページで、次のようにアプリケーションの詳細を入力します。
-
[アプリケーション名] には
MyApplication
と入力します。 -
[ランタイム] には、[Apache Flink] を選択します。
注記
Apache Beam は現在、Apache Flink バージョン 1.19 以降と互換性がありません。
バージョンプルダウンから Apache Flink バージョン 1.15 を選択します。
-
-
アクセス許可 で、IAMロールの作成/更新
kinesis-analytics-MyApplication-us-west-2
を選択します。 -
[Create application] を選択します。
注記
コンソールを使用して Managed Service for Apache Flink アプリケーションを作成する場合、アプリケーション用に IAMロールとポリシーを作成するオプションがあります。アプリケーションではこのロールとポリシーを使用して、依存リソースにアクセスします。これらのIAMリソースの名前は、次のようにアプリケーション名とリージョンを使用して付けられます。
-
ポリシー:
kinesis-analytics-service-
MyApplication
-us-west-2
-
ロール:
kinesis-analytics-MyApplication-
us-west-2
IAM ポリシーを編集する
IAM ポリシーを編集して、Kinesis データストリームにアクセスするためのアクセス許可を追加します。
でIAMコンソールを開きますhttps://console.aws.amazon.com/iam/
。 -
[Policies] (ポリシー) を選択します。前のセクションでコンソールによって作成された
kinesis-analytics-service-MyApplication-us-west-2
ポリシーを選択します。 -
[概要] ページで、[ポリシーの編集] を選択します。JSON タブを選択します。
-
次のポリシー例で強調表示されているセクションをポリシーに追加します。サンプルアカウント IDs (
012345678901
) を アカウント ID で使用します。{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:
012345678901
:log-group:*", "arn:aws:s3:::ka-app-code-<username>
/basic-beam-app-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleOutputStream" }
アプリケーションを設定する
-
MyApplication ページで、 の設定 を選択します。
-
[Configure application] ページで、[Code location] を次のように指定します。
-
[Amazon S3 バケット] で、
ka-app-code-
と入力します。<username>
-
[Amazon S3 オブジェクトへのパス] で、
basic-beam-app-1.0.jar
と入力します。
-
-
「アプリケーションリソースへのアクセス」の「アクセス許可」で、IAM「ロールの作成/更新
kinesis-analytics-MyApplication-us-west-2
」を選択します。 -
次のように入力します。
グループ ID キー 値 BeamApplicationProperties
InputStreamName
ExampleInputStream
BeamApplicationProperties
OutputStreamName
ExampleOutputStream
BeamApplicationProperties
AwsRegion
us-west-2
-
[Monitoring] の [Monitoring metrics level] が [Application] に設定されていることを確認します。
-
CloudWatch のログ記録では、有効化チェックボックスをオンにします。
-
[Update] (更新) を選択します。
注記
CloudWatch ログ記録を有効にすると、Managed Service for Apache Flink によってロググループとログストリームが作成されます。これらのリソースの名前は次のとおりです。
-
ロググループ:
/aws/kinesis-analytics/MyApplication
-
ログストリーム:
kinesis-analytics-log-stream
このログストリームはアプリケーションのモニターに使用されます。このログストリームは、アプリケーションの結果の送信に使用されたログストリームとは異なります。
アプリケーションを実行する
Flink ジョブグラフは、アプリケーションを実行し、Apache Flink ダッシュボードを開き、目的の Flink ジョブを選択すると表示できます。
CloudWatch コンソールで Managed Service for Apache Flink メトリクスをチェックして、アプリケーションが動作していることを確認できます。
AWS リソースをクリーンアップする
このセクションでは、「タンブリングウィンドウ」チュートリアルで作成した AWS リソースをクリーンアップする手順について説明します。
このトピックには、次のセクションが含まれています。
Managed Service for Apache Flink アプリケーションを削除する
https://console.aws.amazon.com/flink で Managed Service for Apache Flink コンソールを開きます。
Managed Service for Apache Flink パネルで、 を選択しますMyApplication。
アプリケーションのページで[削除]を選択し、削除を確認します。
Kinesis データストリームを削除する
https://console.aws.amazon.com/kinesis で Kinesis
コンソールを開きます。 Kinesis Data Streams パネルで、 を選択しますExampleInputStream。
ExampleInputStream ページで、Kinesis ストリームの削除を選択し、削除を確定します。
Kinesis ストリームページで、 を選択しExampleOutputStream、アクション を選択し、削除 を選択し、削除を確定します。
Amazon S3 オブジェクトとバケットを削除する
で Amazon S3 コンソールを開きますhttps://console.aws.amazon.com/s3/
。 ka-app-code- を選択します。
<username>
バケット。[削除] を選択し、バケット名を入力して削除を確認します。
IAM リソースを削除する
でIAMコンソールを開きますhttps://console.aws.amazon.com/iam/
。 ナビゲーションバーで、[ポリシー] を選択します。
フィルターコントロールに「kinesis」と入力します。
kinesis-analytics-service-MyApplication-us-west-2 ポリシーを選択します。
[ポリシーアクション]、[削除] の順に選択します。
ナビゲーションバーで [ロール]を選択します。
kinesis-analytics-MyApplication-us-west-2 ロールを選択します。
[ロールの削除] を選択し、削除を確定します。
CloudWatch リソースを削除する
で CloudWatch コンソールを開きますhttps://console.aws.amazon.com/cloudwatch/
。 ナビゲーションバーで [ログ] を選択します。
/aws/kinesis-analytics/MyApplication ロググループを選択します。
[ロググループの削除]を選択し、削除を確認してください。
次のステップ
Apache Beam を使用してデータを変換する基本的なApache Flink アプリケーション用 Managed Service を作成し、実行しますた。次に、より高度なApache Flink ソリューション用 Managed Service の例として次のアプリケーションをご覧ください。
「Apache Flink Streaming Workshop 用 Managed Service上のビーム
」:このワークショップでは、バッチとストリーミングを1つの Apache Beam パイプラインに統合したエンド・ツー・エンドの例について説明します。