Amazon EMR の Zeppelin から Flink ジョブを操作する - Amazon EMR

Amazon EMR の Zeppelin から Flink ジョブを操作する

Amazon EMR リリース 6.10.0 以降では、Apache Zeppelin と Apache Flink の連携がサポートされており、Flink ジョブを Zeppelin ノートブックからインタラクティブに送信できます。Flink インタープリタを使用すると、Flink クエリの実行、Flink ストリーミングおよびバッチのジョブ定義、Zeppelin ノートブック内での出力の視覚化を行えます。Flink インタープリタは、Flink REST API をベースに構築されています。これにより、Zeppelin 環境内から Flink ジョブにアクセスして操作し、データの処理と分析をリアルタイムに実行できます。

Flink インタープリタには 4 つのサブインタープリターがあります。これらは、異なる目的を持ちますが、すべて JVM 内に存在し、事前設定された、Flink への同じエントリポイント (ExecutionEnviromentStreamExecutionEnvironmentBatchTableEnvironmentStreamTableEnvironment) を共有しています。具体的なインタープリタは、次のとおりです。

  • %flinkExecutionEnvironmentStreamExecutionEnvironmentBatchTableEnvironmentStreamTableEnvironment を作成し、Scala 環境を提供する

  • %flink.pyflink — Python 環境を提供する

  • %flink.ssql — ストリーミング SQL 環境を提供する

  • %flink.bsql — バッチ SQL 環境を提供する

Apache Zeppelin で動作する Apache Flink が EMR クラスター上で稼働できるよう設定するには、次の手順に従います。

  1. Amazon EMR コンソールからクラスターを新規作成します。emr-6.10.0 以降の Amazon EMR リリースを選択します。次に、[カスタム] オプションを使用して、アプリケーションバンドルのカスタマイズを選択します。バンドルには、少なくとも Flink、Hadoop、Zeppelin を含めてください。

    Amazon EMR コンソールで、[カスタム] オプションを使用して、アプリケーションバンドルをカスタマイズします。バンドルには、少なくとも Flink、Hadoop、Zeppelin を含めてください。
  2. 残りのクラスターは、任意の設定で作成します。

  3. クラスターを稼働させたら、そのクラスターをコンソールから選択して詳細を表示し、[アプリケーション] タブを開きます。[アプリケーションユーザーインターフェイス] セクションから [Zeppelin] を選択して、Zeppelin ウェブインターフェイスを開きます。前提条件 の説明どおり、プライマリノードへの SSH トンネルとプロキシ接続を介して、Zeppelin ウェブインターフェイスにアクセスできるよう設定されていることを確認します。

    Zeppelin ウェブインターフェイスでは、ノートブックをインポートして新規作成できます。
  4. これで、Flink をデフォルトのインタープリタとして使用して、Zeppelin ノートブックにノートを新規作成できるようになりました。

    Flink をデフォルトのインタープリタとして使用して、Zeppelin ノートブックにノートを新規作成できます。
  5. Zeppelin ノートブックから Flink ジョブを実行する方法については、次のコード例を参照してください。

  • 例 1: Flink Scala

    a) バッチの WordCount の例 (SCALA)

    %flink val data = benv.fromElements("hello world", "hello flink", "hello hadoop") data.flatMap(line => line.split("\\s")) .map(w => (w, 1)) .groupBy(0) .sum(1) .print()

    b) ストリーミングの WordCount の例 (SCALA)

    %flink val data = senv.fromElements("hello world", "hello flink", "hello hadoop") data.flatMap(line => line.split("\\s")) .map(w => (w, 1)) .keyBy(0) .sum(1) .print senv.execute()
    例えば、Zeppelin ノートブックから、バッチの WordCount ジョブと、ストリーミングの WordCount ジョブを実行できます。
  • 例 2: Flink ストリーミング SQL

    %flink.ssql SET 'sql-client.execution.result-mode' = 'tableau'; SET 'table.dml-sync' = 'true'; SET 'execution.runtime-mode' = 'streaming'; create table dummy_table ( id int, data string ) with ( 'connector' = 'filesystem', 'path' = 's3://s3-bucket/dummy_table', 'format' = 'csv' ); INSERT INTO dummy_table SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE')); SELECT * FROM dummy_table;
    この例は、Flink ストリーミング SQL ジョブの実行方法を示しています。
  • 例 3: Pyflink word.txt という名前のサンプルテキストファイルを S3 バケットにアップロードする必要があることに注意してください。

    %flink.pyflink import argparse import logging import sys from pyflink.common import Row from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema, DataTypes, FormatDescriptor) from pyflink.table.expressions import lit, col from pyflink.table.udf import udtf def word_count(input_path, output_path): t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) # write all the data to one file t_env.get_config().set("parallelism.default", "1") # define the source if input_path is not None: t_env.create_temporary_table( 'source', TableDescriptor.for_connector('filesystem') .schema(Schema.new_builder() .column('word', DataTypes.STRING()) .build()) .option('path', input_path) .format('csv') .build()) tab = t_env.from_path('source') else: print("Executing word_count example with default input data set.") print("Use --input to specify file input.") tab = t_env.from_elements(map(lambda i: (i,), word_count_data), DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())])) # define the sink if output_path is not None: t_env.create_temporary_table( 'sink', TableDescriptor.for_connector('filesystem') .schema(Schema.new_builder() .column('word', DataTypes.STRING()) .column('count', DataTypes.BIGINT()) .build()) .option('path', output_path) .format(FormatDescriptor.for_format('canal-json') .build()) .build()) else: print("Printing result to stdout. Use --output to specify output path.") t_env.create_temporary_table( 'sink', TableDescriptor.for_connector('print') .schema(Schema.new_builder() .column('word', DataTypes.STRING()) .column('count', DataTypes.BIGINT()) .build()) .build()) @udtf(result_types=[DataTypes.STRING()]) def split(line: Row): for s in line[0].split(): yield Row(s) # compute word count tab.flat_map(split).alias('word') \ .group_by(col('word')) \ .select(col('word'), lit(1).count) \ .execute_insert('sink') \ .wait() logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") word_count("s3://s3_bucket/word.txt", "s3://s3_bucket/demo_output.txt")
  1. Zeppelin UI で [FLINK JOB] を選択すると、Flink ウェブの UI にアクセスし、表示を行えます。

    Flink code snippet for word count with output showing counts for "hello", "flink", "hadoop", and "world".
  2. [FLINK JOB] を選択すると、Flink ウェブコンソールにルーティングされ、そのページがブラウザの別のタブで開きます。

    [FLINK JOB] を選択すると、Flink ウェブコンソールがブラウザの別のタブで開きます。