Arbeiten mit Flink-Aufträgen von Zeppelin in Amazon EMR - Amazon EMR

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Arbeiten mit Flink-Aufträgen von Zeppelin in Amazon EMR

Amazon-EMR-Versionen 6.10.0 und höher unterstützen die Apache Zeppelin-Integration mit Apache Flink. Sie können Flink-Aufträge interaktiv über Zeppelin-Notebooks einreichen. Mit dem Flink-Interpreter können Sie Flink-Abfragen ausführen, Flink-Streaming- und Batch-Aufträge definieren und die Ausgabe in Zeppelin-Notebooks visualisieren. Der Flink-Interpreter basiert auf der Flink-REST-API. Auf diese Weise können Sie von der Zeppelin-Umgebung aus auf Flink-Aufträge zugreifen und diese bearbeiten, um eine Datenverarbeitung und -analyse in Echtzeit durchzuführen.

In Flink Interpreter gibt es vier Unterinterpreter. Sie dienen unterschiedlichen Zwecken, befinden sich aber alle in der JVM und teilen sich dieselben vorkonfigurierten Einstiegspunkte zu Flink (ExecutionEnviroment, StreamExecutionEnvironment, BatchTableEnvironment, StreamTableEnvironment). Die Interpreter sind wie folgt:

  • %flink – Erzeugt ExecutionEnvironment, StreamExecutionEnvironment, BatchTableEnvironment, StreamTableEnvironment und stellt eine Scala-Umgebung bereit

  • %flink.pyflink – Stellt eine Python-Umgebung bereit

  • %flink.ssql – Stellt eine Streaming-SQL-Umgebung bereit

  • %flink.bsql – Stellt eine Batch-SQL-Umgebung bereit

Gehen Sie wie folgt vor, um Apache Flink auf Apache Zeppelin für die Ausführung auf einem EMR-Cluster zu konfigurieren:

  1. Erstellen Sie einen neuen Cluster von der Amazon-EMR-Konsole aus. Wählen Sie emr-6.10.0 oder höher für die Amazon-EMR-Version aus. Wählen Sie dann, ob Sie Ihr Anwendungspaket mit der Option Benutzerdefiniert anpassen möchten. Nehmen Sie mindestens Flink, Hadoop und Zeppelin in Ihr Paket auf.

    Passen Sie in der Amazon-EMR-Konsole Ihr Anwendungspaket mit der Option Benutzerdefiniert an. Mindestens Flink, Hadoop und Zeppelin in Ihr Paket aufnehmen
  2. Erstellen Sie den Rest Ihres Clusters mit den Einstellungen, die Sie bevorzugen.

  3. Sobald Ihr Cluster läuft, wählen Sie den Cluster in der Konsole aus, um seine Details anzuzeigen, und öffnen Sie die Registerkarte Anwendungen. Wählen Sie Zeppelin im Bereich Benutzeroberflächen für Anwendungen aus, um die Zeppelin-Weboberfläche zu öffnen. Stellen Sie sicher, dass Sie den Zugriff auf die Zeppelin-Weboberfläche mit einem SSH-Tunnel zum Primärknoten und einer Proxyverbindung eingerichtet haben, wie in Voraussetzungen beschrieben.

    Auf der Zeppelin-Weboberfläche können Sie neue Notebooks importieren und erstellen.
  4. Jetzt können Sie eine neue Notiz in einem Zeppelin-Notebook mit Flink als Standardinterpreter erstellen.

    Sie können eine neue Notiz in einem Zeppelin-Notebook mit Flink als Standardinterpreter erstellen.
  5. In den folgenden Codebeispielen wird veranschaulicht, wie Flink-Jobs von einem Zeppelin-Notebook aus ausgeführt werden.

  • Beispiel 1, Flink Scala

    a) Batch- WordCount Beispiel (SCALA)

    %flink val data = benv.fromElements("hello world", "hello flink", "hello hadoop") data.flatMap(line => line.split("\\s")) .map(w => (w, 1)) .groupBy(0) .sum(1) .print()

    b) Streaming- WordCount Beispiel (SCALA)

    %flink val data = senv.fromElements("hello world", "hello flink", "hello hadoop") data.flatMap(line => line.split("\\s")) .map(w => (w, 1)) .keyBy(0) .sum(1) .print senv.execute()
    Sie können beispielsweise Batch- WordCount und Streaming- WordCountAufträge von einem Zeppelin-Notebook aus ausführen.
  • Beispiel 2, Flink Streaming SQL

    %flink.ssql SET 'sql-client.execution.result-mode' = 'tableau'; SET 'table.dml-sync' = 'true'; SET 'execution.runtime-mode' = 'streaming'; create table dummy_table ( id int, data string ) with ( 'connector' = 'filesystem', 'path' = 's3://s3-bucket/dummy_table', 'format' = 'csv' ); INSERT INTO dummy_table SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE')); SELECT * FROM dummy_table;
    Dieses Beispiel zeigt, wie ein Flink-Streaming-SQL-Auftrag ausgeführt wird.
  • Beispiel 3, Pyflink. Beachten Sie, dass Sie Ihre eigene Beispieltextdatei mit dem Namen word.txt in Ihren S3-Bucket hochladen müssen.

    %flink.pyflink import argparse import logging import sys from pyflink.common import Row from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema, DataTypes, FormatDescriptor) from pyflink.table.expressions import lit, col from pyflink.table.udf import udtf def word_count(input_path, output_path): t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) # write all the data to one file t_env.get_config().set("parallelism.default", "1") # define the source if input_path is not None: t_env.create_temporary_table( 'source', TableDescriptor.for_connector('filesystem') .schema(Schema.new_builder() .column('word', DataTypes.STRING()) .build()) .option('path', input_path) .format('csv') .build()) tab = t_env.from_path('source') else: print("Executing word_count example with default input data set.") print("Use --input to specify file input.") tab = t_env.from_elements(map(lambda i: (i,), word_count_data), DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())])) # define the sink if output_path is not None: t_env.create_temporary_table( 'sink', TableDescriptor.for_connector('filesystem') .schema(Schema.new_builder() .column('word', DataTypes.STRING()) .column('count', DataTypes.BIGINT()) .build()) .option('path', output_path) .format(FormatDescriptor.for_format('canal-json') .build()) .build()) else: print("Printing result to stdout. Use --output to specify output path.") t_env.create_temporary_table( 'sink', TableDescriptor.for_connector('print') .schema(Schema.new_builder() .column('word', DataTypes.STRING()) .column('count', DataTypes.BIGINT()) .build()) .build()) @udtf(result_types=[DataTypes.STRING()]) def split(line: Row): for s in line[0].split(): yield Row(s) # compute word count tab.flat_map(split).alias('word') \ .group_by(col('word')) \ .select(col('word'), lit(1).count) \ .execute_insert('sink') \ .wait() logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") word_count("s3://s3_bucket/word.txt", "s3://s3_bucket/demo_output.txt")
  1. Wählen Sie FLINK JOB in der Zeppelin-Benutzeroberfläche, um auf die Flink-Web-UI zuzugreifen und diese anzusehen.

    Flink code snippet for word count with output showing counts for "hello", "flink", "hadoop", and "world".
  2. Wenn Sie FLINK JOB wählen, gelangen Sie zur Flink Web Console in einer anderen Registerkarte Ihres Browsers.

    Wenn Sie FLINK JOB wählen, wird die Flink Web Console in einer anderen Registerkarte Ihres Browsers geöffnet.