Amazon での Flink の設定 EMR - Amazon EMR

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon での Flink の設定 EMR

Amazon EMR リリース 6.9.0 以降では、Apache Flink コネクタを使用して Hive Metastore と AWS Glue Catalog の両方がサポートされています。このセクションでは、AWS Glue CatalogHive Metastore を Flink で使用するために必要な手順について概説します。

  1. リリース 6.9.0 以降で、HiveFlink の 2 つ以上のアプリケーションを持つEMRクラスターを作成します。

  2. スクリプトランナーを使用して、次のスクリプトをステップ関数として実行します。

    hive-metastore-setup.sh

    sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
    Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.
  1. リリース 6.9.0 以降で、HiveFlink の 2 つ以上のアプリケーションを持つEMRクラスターを作成します。

  2. AWS Glue Data Catalog の設定にある [Hive テーブルメタデータに使用] を選択して、クラスターでデータカタログを有効にします。

  3. スクリプトランナーを使用して、次のスクリプトをステップ関数として実行します。Amazon EMRクラスター でコマンドとスクリプトを実行します

    glue-catalog-setup.sh

    sudo cp /usr/lib/hive/auxlib/aws-glue-datacatalog-hive3-client.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/aws-glue-datacatalog-hive3-client.jar sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
    Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.

Amazon EMR設定を使用してAPI、設定ファイルを使用して Flink を設定できます。内で設定可能なファイルAPIは次のとおりです。

  • flink-conf.yaml

  • log4j.properties

  • flink-log4j-session

  • log4j-cli.properties

flink-conf.yaml は、Flink の主要な設定ファイルです。

Flink に使用するタスクスロットの数を AWS CLIから設定するには
  1. 次のコンテンツを含む configurations.json ファイルを作成します。

    [ { "Classification": "flink-conf", "Properties": { "taskmanager.numberOfTaskSlots":"2" } } ]
  2. 次に、次の設定でクラスターを作成します。

    aws emr create-cluster --release-label emr-7.3.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole
注記

Flink を使用して一部の設定を変更することもできますAPI。詳細については、Flink のドキュメントで「Concepts (概念)」を確認してください。

Amazon EMRバージョン 5.21.0 以降では、クラスター設定を上書きし、実行中のクラスター内のインスタンスグループごとに追加の設定分類を指定できます。これは、Amazon EMRコンソール、 AWS Command Line Interface (AWS CLI)、または を使用して行います AWS SDK。詳細については、「実行中のクラスター内のインスタンスグループの設定を指定する」を参照してください。

アプリケーションの所有者であれば、Flink 内のタスクにどのリソースを割り当てるべきかを熟知しているでしょう。このドキュメントの例では、アプリケーションに使用するスレーブインスタンスと同じ数のタスクを使用します。初期レベルの並列処理では、多くの場合、そうすることをお勧めしますが、タスクスロットを使用して並列処理の粒度を高めることも可能です。ただし、一般的には、並列処理がインスタンスあたりの仮想コア数 (性能) を超えないようにする必要があります。Flink アーキテクチャの詳細については、Flink ドキュメントの「Concepts」を参照してください。

Flink JobManager の は、複数のプライマリノードを持つ Amazon EMRクラスターのプライマリノードフェイルオーバープロセス中に引き続き使用できます。Amazon EMR 5.28.0 以降、 JobManager 高可用性も自動的に有効になります。手動設定は必要ありません。

Amazon EMRバージョン 5.27.0 以前では、 JobManager は単一障害点です。が JobManager 失敗すると、すべてのジョブ状態が失われ、実行中のジョブは再開されません。次の例に示すように、アプリケーションの試行数、チェックポイント、Flink の状態ストレージ ZooKeeper として を有効にすることで、 JobManager 高可用性を有効にすることができます。

[ { "Classification": "yarn-site", "Properties": { "yarn.resourcemanager.am.max-attempts": "10" } }, { "Classification": "flink-conf", "Properties": { "yarn.application-attempts": "10", "high-availability": "zookeeper", "high-availability.zookeeper.quorum": "%{hiera('hadoop::zk')}", "high-availability.storageDir": "hdfs:///user/flink/recovery", "high-availability.zookeeper.path.root": "/flink" } } ]

Flink の最大アプリケーションマスター試行YARN回数とアプリケーション試行回数の両方を設定する必要があります。詳細については、YARN「クラスターの高可用性の設定」を参照してください。また、Flink チェックポイントを設定して、以前に完了したチェックポイントから JobManager 実行中のジョブを復旧するようにすることもできます。詳細については、「Flink のチェックポイント」を参照してください。

Flink 1.11.x を使用する Amazon EMRバージョンでは、 で (jobmanager.memory.process.size) と TaskManager (taskmanager.memory.process.size) の両方 JobManagerの合計メモリプロセスサイズを設定する必要がありますflink-conf.yaml。これらの値は、 設定でクラスターを設定するか、 API を使用してこれらのフィールドのコメントを手動で解除することで設定できますSSH。Flink には、次のデフォルト値が設定されています。

  • jobmanager.memory.process.size: 1600m

  • taskmanager.memory.process.size: 1728m

JVM メタスペースとオーバーヘッドを除外するには、 の代わりに合計 Flink メモリサイズ (taskmanager.memory.flink.size) を使用しますtaskmanager.memory.process.sizetaskmanager.memory.process.size のデフォルト値は 1280m です。taskmanager.memory.process.sizetaskmanager.memory.process.size の両方を設定することはお勧めしません。

Flink 1.12.0 以降を使用するすべての Amazon EMRバージョンでは、Flink のオープンソースセットにデフォルト値が Amazon のデフォルト値としてリストされているためEMR、自分で設定する必要はありません。

Flink アプリケーションコンテナは、.out ファイル、.log ファイル、.err ファイルの 3 種類のログファイルを作成して書き込みます。.err ファイルのみが圧縮されてファイルシステムから削除され、.log および .out ログファイルはファイルシステムに残ります。これらの出力ファイルを管理しやすく、クラスターを安定させるために、log4j.properties のログローテーションを設定して、ファイルの最大数を設定し、それらのサイズを制限できます。

Amazon EMRバージョン 5.30.0 以降

Amazon EMR 5.30.0 以降、Flink は設定分類名で log4j2 ログ記録フレームワークを使用します。次の設定例は log4j2 flink-log4j. 形式を示しています。

[ { "Classification": "flink-log4j", "Properties": { "appender.main.name": "MainAppender", "appender.main.type": "RollingFile", "appender.main.append" : "false", "appender.main.fileName" : "${sys:log.file}", "appender.main.filePattern" : "${sys:log.file}.%i", "appender.main.layout.type" : "PatternLayout", "appender.main.layout.pattern" : "%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n", "appender.main.policies.type" : "Policies", "appender.main.policies.size.type" : "SizeBasedTriggeringPolicy", "appender.main.policies.size.size" : "100MB", "appender.main.strategy.type" : "DefaultRolloverStrategy", "appender.main.strategy.max" : "10" }, } ]

Amazon EMRバージョン 5.29.0 以前

Amazon EMRバージョン 5.29.0 以前では、Flink は log4j ログ記録フレームワークを使用します。次の設定例は、log4j 形式を示しています。

[ { "Classification": "flink-log4j", "Properties": { "log4j.appender.file": "org.apache.log4j.RollingFileAppender", "log4j.appender.file.append":"true", # keep up to 4 files and each file size is limited to 100MB "log4j.appender.file.MaxFileSize":"100MB", "log4j.appender.file.MaxBackupIndex":4, "log4j.appender.file.layout":"org.apache.log4j.PatternLayout", "log4j.appender.file.layout.ConversionPattern":"%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n" }, } ]

Amazon EMRリリース 6.12.0 以降では、Flink の Java 11 ランタイムサポートが提供されます。このセクションでは、Java 11 ランタイム環境で Flink を実行するための設定について説明します。

Flink と Java 11 ランタイムを使用してEMRクラスターを作成するには、次のステップを実行します。Java 11 ランタイム環境を使用できるようにする設定ファイルは flink-conf.yaml です。

Console
コンソールで Flink と Java 11 ランタイムを使用してクラスターを作成するには
  1. にサインインし AWS Management Console、https://console.aws.amazon.com/emr で Amazon EMRコンソールを開きます。

  2. ナビゲーションペインの EMRの EC2クラスター を選択し、クラスター を作成します

  3. Amazon EMRリリース 6.12.0 以降を選択し、Flink アプリケーションのインストールを選択します。クラスターにインストールする他のアプリケーションを選択します。

  4. クラスターの設定をそのまま進めます。オプションの [ソフトウェア設定] セクションで、デフォルトの [設定の入力] オプションを使用して、次の設定を入力します。

    [ { "Classification": "flink-conf", "Properties": { "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" } } ]
  5. 続けてクラスターの設定と起動を行います。

AWS CLI
から Flink と Java 11 ランタイムを使用してクラスターを作成するには CLI
  1. Flink を Java 11 で実行する設定ファイル configurations.json を作成する

    [ { "Classification": "flink-conf", "Properties": { "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" } } ]
  2. から AWS CLI、Amazon EMRリリース 6.12.0 以降で新しいEMRクラスターを作成し、次の例に示すように Flink アプリケーションをインストールします。

    aws emr create-cluster --release-label emr-6.12.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole

Flink と Java 11 ランタイムで実行中のEMRクラスターを更新するには、次の手順を実行します。Java 11 ランタイム環境を使用できるようにする設定ファイルは flink-conf.yaml です。

Console
コンソールで Flink と Java 11 ランタイムを使用して実行中のクラスターを更新するには
  1. にサインインし AWS Management Console、https://console.aws.amazon.com/emr で Amazon EMRコンソールを開きます。

  2. ナビゲーションペインの EMRの EC2クラスターを選択し、更新するクラスターを選択します。

    注記

    クラスターは Java 11 をサポートするには、Amazon EMRリリース 6.12.0 以降を使用する必要があります。

  3. [設定] タブを選択します。

  4. [インスタンスグループ設定] セクションで、更新する [実行中] のインスタンスグループを選択し、リストアクションメニューから [再設定] を選択します。

  5. [属性の編集] オプションを使用してインスタンスグループを次のように再設定します。各設定の後に、[新しい設定の追加] を選択します。

    分類 プロパティ

    flink-conf

    containerized.taskmanager.env.JAVA_HOME

    /usr/lib/jvm/jre-11

    flink-conf

    containerized.master.env.JAVA_HOME

    /usr/lib/jvm/jre-11

    flink-conf

    env.java.home

    /usr/lib/jvm/jre-11

  6. [変更の保存] を選択して設定を追加します。

AWS CLI
から Flink と Java 11 ランタイムを使用するように実行中のクラスターを更新するには CLI

modify-instance-groups コマンドを使用して、実行中のクラスター内のインスタンスグループに新しい設定を指定します。

  1. 最初に、設定ファイル configurations.json を作成し、Java 11 を使用できるようにします。次の例では、ig-1xxxxxxx9 再設定するインスタンスグループの ID を指定します。そのファイルを、modify-instance-groups コマンドを実行するディレクトリに保存します。

    [ { "InstanceGroupId":"ig-1xxxxxxx9", "Configurations":[ { "Classification":"flink-conf", "Properties":{ "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" }, "Configurations":[] } ] } ]
  2. から AWS CLI、次のコマンドを実行します。再設定するインスタンスグループの ID を置き換えます。

    aws emr modify-instance-groups --cluster-id j-2AL4XXXXXX5T9 \ --instance-groups file://configurations.json

実行中のクラスターの Java ランタイムを確認するには、「 を使用してプライマリノードに接続する」の説明SSHに従って、 を使用してプライマリノードSSHにログインします。次に、以下のコマンドを実行します。

ps -ef | grep flink

ps コマンドで -ef オプションを指定すると、システム上で実行中のプロセスをすべて一覧表示できます。その出力を grep でフィルタリングすると、flink の文字列が含まれる行を検索できます。Java Runtime Environment (JRE) 値の出力を確認しますjre-XX。次の出力の jre-11 は、Flink のランタイム環境として Java 11 が選択されることを示しています。

flink    19130     1  0 09:17 ?        00:00:15 /usr/lib/jvm/jre-11/bin/java -Djava.io.tmpdir=/mnt/tmp -Dlog.file=/usr/lib/flink/log/flink-flink-historyserver-0-ip-172-31-32-127.log -Dlog4j.configuration=file:/usr/lib/flink/conf/log4j.properties -Dlog4j.configurationFile=file:/usr/lib/flink/conf/log4j.properties -Dlogback.configurationFile=file:/usr/lib/flink/conf/logback.xml -classpath /usr/lib/flink/lib/flink-cep-1.17.0.jar:/usr/lib/flink/lib/flink-connector-files-1.17.0.jar:/usr/lib/flink/lib/flink-csv-1.17.0.jar:/usr/lib/flink/lib/flink-json-1.17.0.jar:/usr/lib/flink/lib/flink-scala_2.12-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-java-uber-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-scala-bridge_2.12-1.17.0.

または、 を使用してプライマリノードにログインSSHし、 コマンド を使用して Flink YARNセッションを開始しますflink-yarn-session -d。出力は、次の例java-11-amazon-correttoの Flink の Java 仮想マシン (JVM) を示しています。

2023-05-29 10:38:14,129 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: containerized.master.env.JAVA_HOME, /usr/lib/jvm/java-11-amazon-corretto.x86_64