Apache Beam を使用してアプリケーションを作成する - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Apache Beam を使用してアプリケーションを作成する

この課題では、「Apache Beam」を使用してデータを変換する Apache Flink アプリケーション用 Managed Serviceを作成します。Apache Beam はストリーミングデータを処理するためのプログラミングモデルです。Apache Flink 用 Managed Service での Apache Beam の使用ついては、 Managed Service for Apache Flink アプリケーションで Apache Beam を使用する を参照してください。

注記

この演習に必要な前提条件を設定するには、まずチュートリアル: Managed Service for Apache Flink で の使用 DataStream APIを開始する演習を完了してください。

依存リソースを作成する

この練習用の Managed Service for Apache Flink を作成する前に、以下の依存リソースを作成します。

  • 2 つの Kinesis Data Streams (ExampleInputStreamExampleOutputStream)

  • アプリケーションのコードを保存するためのAmazon S3バケット (ka-app-code-<username>)

Kinesis ストリームと Amazon S3 バケットは、コンソールを使用して作成できます。これらのリソースの作成手順については、次の各トピックを参照してください。

  • 「Amazon Kinesis Data Streamsデベロッパーガイド」の「データストリームの作成および更新」 データストリームExampleInputStreamExampleOutputStreamに名前を付けます。

  • Amazon Simple Storage Service ユーザーガイドの「S3 バケットを作成する方法」を参照してください。ログイン名 (ka-app-code-<username> など) を追加して、Amazon S3 バケットにグローバルに一意の名前を付けます。

サンプルレコードを入力ストリームに書き込む

このセクションでは、Python スクリプトを使用して、アプリケーションが処理するランダムな文字列をストリームに書き込みます。

注記

このセクションでは AWS SDK for Python (Boto) が必要です。

  1. 次の内容で、ping.py という名前のファイルを作成します。

    import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")
  2. ping.py スクリプトを実行します。

    $ python ping.py

    チュートリアルの残りの部分を完了する間、スクリプトを実行し続けてください。

アプリケーションコードをダウンロードして調べる

この例の Java アプリケーションコードは、 から入手できます GitHub。アプリケーションコードをダウンロードするには、次の操作を行います。

  1. Git クライアントをまだインストールしていない場合は、インストールします。詳細については、「Git のインストール」をご参照ください。

  2. 次のコマンドを使用してリモートリポジトリのクローンを作成します。

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. amazon-kinesis-data-analytics-java-examples/Beam ディレクトリに移動します。

アプリケーションコードはBasicBeamStreamingJob.javaファイルに含まれています。アプリケーションコードに関して、以下の点に注意してください。

  • アプリケーションは Apache Beam を使用してParDo、 というカスタム変換関数を呼び出して受信レコードを処理しますPingPongFn

    PingPongFn 関数を呼び出すコードは次のとおりです。

    .apply("Pong transform", ParDo.of(new PingPongFn())
  • Apache Beam を使用する Apache Flink アプリケーション用 Managed Serviceには、以下のコンポーネントが必要です。これらのコンポーネントとバージョンを pom.xml に含めないと、アプリケーションは環境の依存関係から誤ったバージョンをロードし、バージョンが一致しないため、実行時にアプリケーションがクラッシュします。

    <jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>
  • PingPongFn 変換関数は、入力データが ping でない限り、入力データを出力ストリームに渡します。「ping」である場合は、文字列「pong\n」を出力ストリームに出力します。

    変換関数のコードは以下のとおりです。

    private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }

アプリケーションコードをコンパイルする

アプリケーションをコンパイルするには、次の操作を行います。

  1. Java と Maven がまだインストールされていない場合は、インストールします。詳細については、チュートリアル: Managed Service for Apache Flink で の使用 DataStream APIを開始するチュートリアルの「必要な前提条件を完了する」を参照してください。

  2. 次のコマンドを使用して、アプリケーションをコンパイルします。

    mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
    注記

    提供されているソースコードは Java 11 のライブラリーに依存しています。

アプリケーションをコンパイルすると、アプリケーションJARファイル () が作成されますtarget/basic-beam-app-1.0.jar

Apache Flink ストリーミング Java コードをアップロードする

このセクションでは、依存リソースを作成する のセクションで作成した Amazon S3 バケットにアプリケーションコードをアップロードします。

  1. Amazon S3 コンソールで、 ka-app-code- を選択します。<username> バケット を選択し、アップロード を選択します。

  2. ファイルの選択のステップで、[ファイルを追加] を選択します。前のステップで作成した basic-beam-app-1.0.jar ファイルに移動します。

  3. オブジェクトの設定を変更する必要はないので、[アップロード] を選択してください。

アプリケーションコードが Amazon S3 バケットに保存され、アプリケーションからアクセスできるようになります。

Managed Service for Apache Flink アプリケーションを作成して実行する

以下の手順を実行し、コンソールを使用してアプリケーションを作成、設定、更新、および実行します。

アプリケーションの作成

  1. https://console.aws.amazon.com/flink で Managed Service for Apache Flink コンソールを開きます。

  2. Managed Service for Apache Flinkのダッシュボードで、「分析アプリケーションの作成」 を選択します。

  3. Managed Service for Apache Flink-アプリケーションの作成」ページで、次のようにアプリケーションの詳細を入力します。

    • [アプリケーション名] には MyApplication と入力します。

    • [ランタイム] には、[Apache Flink] を選択します。

      注記

      Apache Beam は現在、Apache Flink バージョン 1.19 以降と互換性がありません。

    • バージョンプルダウンから Apache Flink バージョン 1.15 を選択します。

  4. アクセス許可 でIAMロールの作成/更新 kinesis-analytics-MyApplication-us-west-2を選択します。

  5. [Create application] を選択します。

注記

コンソールを使用して Managed Service for Apache Flink アプリケーションを作成する場合、アプリケーション用に IAMロールとポリシーを作成するオプションがあります。アプリケーションではこのロールとポリシーを使用して、依存リソースにアクセスします。これらのIAMリソースの名前は、次のようにアプリケーション名とリージョンを使用して付けられます。

  • ポリシー: kinesis-analytics-service-MyApplication-us-west-2

  • ロール: kinesis-analytics-MyApplication-us-west-2

IAM ポリシーを編集する

IAM ポリシーを編集して、Kinesis データストリームにアクセスするためのアクセス許可を追加します。

  1. でIAMコンソールを開きますhttps://console.aws.amazon.com/iam/

  2. [Policies] (ポリシー) を選択します。前のセクションでコンソールによって作成された kinesis-analytics-service-MyApplication-us-west-2 ポリシーを選択します。

  3. [概要] ページで、[ポリシーの編集] を選択します。JSON タブを選択します。

  4. 次のポリシー例で強調表示されているセクションをポリシーに追加します。サンプルアカウント IDs (012345678901) を アカウント ID で使用します。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*", "arn:aws:s3:::ka-app-code-<username>/basic-beam-app-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

アプリケーションを設定する

  1. MyApplication ページで、 の設定 を選択します。

  2. [Configure application] ページで、[Code location] を次のように指定します。

    • [Amazon S3 バケット] で、ka-app-code-<username>と入力します。

    • [Amazon S3 オブジェクトへのパス] で、basic-beam-app-1.0.jarと入力します。

  3. 「アプリケーションリソースへのアクセス」の「アクセス許可」で、IAM「ロールの作成/更新kinesis-analytics-MyApplication-us-west-2」を選択します。

  4. 次のように入力します。

    グループ ID キー
    BeamApplicationProperties InputStreamName ExampleInputStream
    BeamApplicationProperties OutputStreamName ExampleOutputStream
    BeamApplicationProperties AwsRegion us-west-2
  5. [Monitoring] の [Monitoring metrics level] が [Application] に設定されていることを確認します。

  6. CloudWatch のログ記録では、有効化チェックボックスをオンにします。

  7. [Update] (更新) を選択します。

注記

CloudWatch ログ記録を有効にすると、Managed Service for Apache Flink によってロググループとログストリームが作成されます。これらのリソースの名前は次のとおりです。

  • ロググループ: /aws/kinesis-analytics/MyApplication

  • ログストリーム: kinesis-analytics-log-stream

このログストリームはアプリケーションのモニターに使用されます。このログストリームは、アプリケーションの結果の送信に使用されたログストリームとは異なります。

アプリケーションを実行する

Flink ジョブグラフは、アプリケーションを実行し、Apache Flink ダッシュボードを開き、目的の Flink ジョブを選択すると表示できます。

CloudWatch コンソールで Managed Service for Apache Flink メトリクスをチェックして、アプリケーションが動作していることを確認できます。

AWS リソースをクリーンアップする

このセクションでは、「タンブリングウィンドウ」チュートリアルで作成した AWS リソースをクリーンアップする手順について説明します。

Managed Service for Apache Flink アプリケーションを削除する

  1. https://console.aws.amazon.com/flink で Managed Service for Apache Flink コンソールを開きます。

  2. Managed Service for Apache Flink パネルで、 を選択しますMyApplication

  3. アプリケーションのページで[削除]を選択し、削除を確認します。

Kinesis データストリームを削除する

  1. https://console.aws.amazon.com/kinesis で Kinesis コンソールを開きます。

  2. Kinesis Data Streams パネルで、 を選択しますExampleInputStream

  3. ExampleInputStream ページで、Kinesis ストリームの削除を選択し、削除を確定します。

  4. Kinesis ストリームページで、 を選択しExampleOutputStreamアクション を選択し、削除 を選択し、削除を確定します。

Amazon S3 オブジェクトとバケットを削除する

  1. で Amazon S3 コンソールを開きますhttps://console.aws.amazon.com/s3/

  2. ka-app-code- を選択します。<username> バケット。

  3. [削除] を選択し、バケット名を入力して削除を確認します。

IAM リソースを削除する

  1. でIAMコンソールを開きますhttps://console.aws.amazon.com/iam/

  2. ナビゲーションバーで、[ポリシー] を選択します。

  3. フィルターコントロールに「kinesis」と入力します。

  4. kinesis-analytics-service-MyApplication-us-west-2 ポリシーを選択します。

  5. [ポリシーアクション]、[削除] の順に選択します。

  6. ナビゲーションバーで [ロール]を選択します。

  7. kinesis-analytics-MyApplication-us-west-2 ロールを選択します。

  8. [ロールの削除] を選択し、削除を確定します。

CloudWatch リソースを削除する

  1. で CloudWatch コンソールを開きますhttps://console.aws.amazon.com/cloudwatch/

  2. ナビゲーションバーで [ログ] を選択します。

  3. /aws/kinesis-analytics/MyApplication ロググループを選択します。

  4. [ロググループの削除]を選択し、削除を確認してください。

次のステップ

Apache Beam を使用してデータを変換する基本的なApache Flink アプリケーション用 Managed Service を作成し、実行しますた。次に、より高度なApache Flink ソリューション用 Managed Service の例として次のアプリケーションをご覧ください。