Cookie の設定を選択する

当社は、当社のサイトおよびサービスを提供するために必要な必須 Cookie および類似のツールを使用しています。当社は、パフォーマンス Cookie を使用して匿名の統計情報を収集することで、お客様が当社のサイトをどのように利用しているかを把握し、改善に役立てています。必須 Cookie は無効化できませんが、[カスタマイズ] または [拒否] をクリックしてパフォーマンス Cookie を拒否することはできます。

お客様が同意した場合、AWS および承認された第三者は、Cookie を使用して便利なサイト機能を提供したり、お客様の選択を記憶したり、関連する広告を含む関連コンテンツを表示したりします。すべての必須ではない Cookie を受け入れるか拒否するには、[受け入れる] または [拒否] をクリックしてください。より詳細な選択を行うには、[カスタマイズ] をクリックしてください。

Amazon EMR の Zeppelin から Flink ジョブを操作する - Amazon EMR

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon EMR の Zeppelin から Flink ジョブを操作する

Amazon EMR リリース 6.10.0 以降では、Apache Zeppelin と Apache Flink の連携がサポートされており、Flink ジョブを Zeppelin ノートブックからインタラクティブに送信できます。Flink インタープリタを使用すると、Flink クエリの実行、Flink ストリーミングおよびバッチのジョブ定義、Zeppelin ノートブック内での出力の視覚化を行えます。Flink インタープリタは、Flink REST API をベースに構築されています。これにより、Zeppelin 環境内から Flink ジョブにアクセスして操作し、データの処理と分析をリアルタイムに実行できます。

Flink インタープリタには 4 つのサブインタープリターがあります。これらは、異なる目的を持ちますが、すべて JVM 内に存在し、事前設定された、Flink への同じエントリポイント (ExecutionEnviromentStreamExecutionEnvironmentBatchTableEnvironmentStreamTableEnvironment) を共有しています。具体的なインタープリタは、次のとおりです。

  • %flinkExecutionEnvironmentStreamExecutionEnvironmentBatchTableEnvironmentStreamTableEnvironment を作成し、Scala 環境を提供する

  • %flink.pyflink — Python 環境を提供する

  • %flink.ssql — ストリーミング SQL 環境を提供する

  • %flink.bsql — バッチ SQL 環境を提供する

Apache Zeppelin で動作する Apache Flink が EMR クラスター上で稼働できるよう設定するには、次の手順に従います。

  1. Amazon EMR コンソールからクラスターを新規作成します。emr-6.10.0 以降の Amazon EMR リリースを選択します。次に、[カスタム] オプションを使用して、アプリケーションバンドルのカスタマイズを選択します。バンドルには、少なくとも Flink、Hadoop、Zeppelin を含めてください。

    Amazon EMR コンソールで、[カスタム] オプションを使用して、アプリケーションバンドルをカスタマイズします。バンドルには、少なくとも Flink、Hadoop、Zeppelin を含めてください。
  2. 残りのクラスターは、任意の設定で作成します。

  3. クラスターを稼働させたら、そのクラスターをコンソールから選択して詳細を表示し、[アプリケーション] タブを開きます。[アプリケーションユーザーインターフェイス] セクションから [Zeppelin] を選択して、Zeppelin ウェブインターフェイスを開きます。前提条件 の説明どおり、プライマリノードへの SSH トンネルとプロキシ接続を介して、Zeppelin ウェブインターフェイスにアクセスできるよう設定されていることを確認します。

    Zeppelin ウェブインターフェイスでは、ノートブックをインポートして新規作成できます。
  4. これで、Flink をデフォルトのインタープリタとして使用して、Zeppelin ノートブックにノートを新規作成できるようになりました。

    Flink をデフォルトのインタープリタとして使用して、Zeppelin ノートブックにノートを新規作成できます。
  5. Zeppelin ノートブックから Flink ジョブを実行する方法については、次のコード例を参照してください。

  • 例 1: Flink Scala

    a) バッチの WordCount の例 (SCALA)

    %flink val data = benv.fromElements("hello world", "hello flink", "hello hadoop") data.flatMap(line => line.split("\\s")) .map(w => (w, 1)) .groupBy(0) .sum(1) .print()

    b) ストリーミングの WordCount の例 (SCALA)

    %flink val data = senv.fromElements("hello world", "hello flink", "hello hadoop") data.flatMap(line => line.split("\\s")) .map(w => (w, 1)) .keyBy(0) .sum(1) .print senv.execute()
    例えば、Zeppelin ノートブックから、バッチの WordCount ジョブと、ストリーミングの WordCount ジョブを実行できます。
  • 例 2: Flink ストリーミング SQL

    %flink.ssql SET 'sql-client.execution.result-mode' = 'tableau'; SET 'table.dml-sync' = 'true'; SET 'execution.runtime-mode' = 'streaming'; create table dummy_table ( id int, data string ) with ( 'connector' = 'filesystem', 'path' = 's3://s3-bucket/dummy_table', 'format' = 'csv' ); INSERT INTO dummy_table SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE')); SELECT * FROM dummy_table;
    この例は、Flink ストリーミング SQL ジョブの実行方法を示しています。
  • 例 3: Pyflink word.txt という名前のサンプルテキストファイルを S3 バケットにアップロードする必要があることに注意してください。

    %flink.pyflink import argparse import logging import sys from pyflink.common import Row from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema, DataTypes, FormatDescriptor) from pyflink.table.expressions import lit, col from pyflink.table.udf import udtf def word_count(input_path, output_path): t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) # write all the data to one file t_env.get_config().set("parallelism.default", "1") # define the source if input_path is not None: t_env.create_temporary_table( 'source', TableDescriptor.for_connector('filesystem') .schema(Schema.new_builder() .column('word', DataTypes.STRING()) .build()) .option('path', input_path) .format('csv') .build()) tab = t_env.from_path('source') else: print("Executing word_count example with default input data set.") print("Use --input to specify file input.") tab = t_env.from_elements(map(lambda i: (i,), word_count_data), DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())])) # define the sink if output_path is not None: t_env.create_temporary_table( 'sink', TableDescriptor.for_connector('filesystem') .schema(Schema.new_builder() .column('word', DataTypes.STRING()) .column('count', DataTypes.BIGINT()) .build()) .option('path', output_path) .format(FormatDescriptor.for_format('canal-json') .build()) .build()) else: print("Printing result to stdout. Use --output to specify output path.") t_env.create_temporary_table( 'sink', TableDescriptor.for_connector('print') .schema(Schema.new_builder() .column('word', DataTypes.STRING()) .column('count', DataTypes.BIGINT()) .build()) .build()) @udtf(result_types=[DataTypes.STRING()]) def split(line: Row): for s in line[0].split(): yield Row(s) # compute word count tab.flat_map(split).alias('word') \ .group_by(col('word')) \ .select(col('word'), lit(1).count) \ .execute_insert('sink') \ .wait() logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") word_count("s3://s3_bucket/word.txt", "s3://s3_bucket/demo_output.txt")
  1. Zeppelin UI で [FLINK JOB] を選択すると、Flink ウェブの UI にアクセスし、表示を行えます。

    Flink code snippet for word count with output showing counts for "hello", "flink", "hadoop", and "world".
  2. [FLINK JOB] を選択すると、Flink ウェブコンソールにルーティングされ、そのページがブラウザの別のタブで開きます。

    [FLINK JOB] を選択すると、Flink ウェブコンソールがブラウザの別のタブで開きます。
プライバシーサイト規約Cookie の設定
© 2025, Amazon Web Services, Inc. or its affiliates.All rights reserved.