在 Amazon 中設定 Flink EMR - Amazon EMR

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

在 Amazon 中設定 Flink EMR

Amazon 6.9.0 版和更新EMR版本支援 Hive Metastore 和 AWS Glue Catalog 搭配 Hive 的 Apache Flink 連接器。本章節概述了使用 Flink 設定 AWS Glue CatalogHive 中繼存放區所需的步驟。

  1. 建立具有 6.9.0 版或更新版本的EMR叢集,以及至少兩個應用程式:HiveFlink

  2. 使用指令碼執行器將下列指令碼作為 Step Functions 執行:

    hive-metastore-setup.sh

    sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
    Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.
  1. 建立具有 6.9.0 版或更新版本的EMR叢集,以及至少兩個應用程式:HiveFlink

  2. 在 AWS Glue Data Catalog 設定中選取用於 Hive 資料表中繼資料,以在叢集中啟用 Data Catalog。

  3. 使用指令碼執行器執行下列指令碼做為步驟函數:在 Amazon EMR叢集上執行命令和指令碼

    glue-catalog-setup.sh

    sudo cp /usr/lib/hive/auxlib/aws-glue-datacatalog-hive3-client.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/aws-glue-datacatalog-hive3-client.jar sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
    Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.

您可以使用 Amazon EMR組態API來設定具有組態檔案的 Flink。可在 內設定的檔案API包括:

  • flink-conf.yaml

  • log4j.properties

  • flink-log4j-session

  • log4j-cli.properties

Flink 的主要組態檔案名為 flink-conf.yaml

從 AWS CLI設定用於 Flink 的任務位置數量
  1. 使用下列內容建立檔案 configurations.json

    [ { "Classification": "flink-conf", "Properties": { "taskmanager.numberOfTaskSlots":"2" } } ]
  2. 再以下列組態建立叢集:

    aws emr create-cluster --release-label emr-7.5.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole
注意

您也可以使用 Flink 變更某些組態API。如需詳細資訊,請參閱 Flink 文件中的概念

使用 Amazon 5.21.0 版及更新EMR版本,您可以覆寫叢集組態,並為執行中叢集中的每個執行個體群組指定其他組態分類。您可以使用 Amazon EMR主控台、 AWS Command Line Interface (AWS CLI) 或 來執行此操作 AWS SDK。如需詳細資訊,請參閱為執行中叢集的執行個體群組提供組態

作為自己應用程式的擁有者,您最清楚應在 Flink 內將哪些資源指派給任務。如需本文中的範例,請使用與您用於應用程式的任務執行個體相同數量的任務。一般而言,我們會建議在執行初始等級的平行處理時採用此設定,但您也可以用任務位置來增加平行處理的精細度;通常不應超過每個執行個體的虛擬核心數量。如需有關 Flink 架構的詳細資訊,請參閱 Flink 文件中的概念

在具有多個主節點 JobManager 的 Amazon EMR叢集中,Flink 的主要節點容錯移轉程序期間, 的 仍然可用。從 Amazon EMR 5.28.0 開始,也會自動啟用 JobManager 高可用性。不需要手動設定。

對於 Amazon 5.27.0 版或更早EMR版本, JobManager 是單一故障點。當 JobManager 失敗時,它會失去所有任務狀態,且不會繼續執行中的任務。您可以透過設定應用程式嘗試計數、檢查點和啟用 ZooKeeper 做為 Flink 的狀態儲存來啟用 JobManager 高可用性,如下列範例所示:

[ { "Classification": "yarn-site", "Properties": { "yarn.resourcemanager.am.max-attempts": "10" } }, { "Classification": "flink-conf", "Properties": { "yarn.application-attempts": "10", "high-availability": "zookeeper", "high-availability.zookeeper.quorum": "%{hiera('hadoop::zk')}", "high-availability.storageDir": "hdfs:///user/flink/recovery", "high-availability.zookeeper.path.root": "/flink" } } ]

您必須設定 Flink 的應用程式主要嘗試次數上限YARN和應用程式嘗試次數上限。如需詳細資訊,請參閱YARN叢集高可用性的組態。您可能也想要設定 Flink 檢查點,以便從先前完成的檢查點重新啟動 JobManager 復原執行中的任務。如需詳細資訊,請參閱 Flink 設定檢查點

對於使用 Flink 1.11.x 的 Amazon EMR版本,您必須在 中設定 JobManager(jobmanager.memory.process.size) 和 TaskManager (taskmanager.memory.process.size) 的總記憶體程序大小。 flink-conf.yaml您可以使用 組態來設定叢集,API或透過 手動取消註解這些欄位,藉此設定這些值SSH。Flink 提供了下列預設值。

  • jobmanager.memory.process.size:1600m

  • taskmanager.memory.process.size:1728m

若要排除中繼空間和額外負荷,請使用總 Flink 記憶體大小 JVM (taskmanager.memory.flink.size),而非 taskmanager.memory.process.sizetaskmanager.memory.process.size 的預設值為 1280m。不建議同時設定 taskmanager.memory.process.sizetaskmanager.memory.process.size

所有使用 Flink 1.12.0 和更新版本 的 Amazon EMR版本,在 Flink 的開放原始碼集中,都會將預設值列為 Amazon 的預設值EMR,因此您不需要自行設定。

Flink 應用程式容器會建立並寫入三種類型的日誌檔案:.out 檔案、.log 檔案和 .err 檔案。僅 .err 檔案被壓縮並從檔案系統中移除,而 .log.out 日誌檔案仍保留在檔案系統中。為了確保這些輸出檔案保持可管理且叢集保持穩定,您可以在 log4j.properties 中設定日誌輪換以設定檔案數量上限並限制大小。

Amazon 5.30.0 版及更新EMR版本

從 Amazon EMR 5.30.0 開始,Flink 使用 log4j2 記錄架構搭配組態分類名稱flink-log4j.。下列範例組態示範 log4j2 格式。

[ { "Classification": "flink-log4j", "Properties": { "appender.main.name": "MainAppender", "appender.main.type": "RollingFile", "appender.main.append" : "false", "appender.main.fileName" : "${sys:log.file}", "appender.main.filePattern" : "${sys:log.file}.%i", "appender.main.layout.type" : "PatternLayout", "appender.main.layout.pattern" : "%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n", "appender.main.policies.type" : "Policies", "appender.main.policies.size.type" : "SizeBasedTriggeringPolicy", "appender.main.policies.size.size" : "100MB", "appender.main.strategy.type" : "DefaultRolloverStrategy", "appender.main.strategy.max" : "10" }, } ]

Amazon 5.29.0 版及更早EMR版本

對於 Amazon 5.29.0 版及更早EMR版本,Flink 使用 log4j 記錄架構。下列範例組態示範了 log4j 格式。

[ { "Classification": "flink-log4j", "Properties": { "log4j.appender.file": "org.apache.log4j.RollingFileAppender", "log4j.appender.file.append":"true", # keep up to 4 files and each file size is limited to 100MB "log4j.appender.file.MaxFileSize":"100MB", "log4j.appender.file.MaxBackupIndex":4, "log4j.appender.file.layout":"org.apache.log4j.PatternLayout", "log4j.appender.file.layout.ConversionPattern":"%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n" }, } ]

Amazon 6.12.0 版及更新EMR版本提供 Flink 的 Java 11 執行期支援。下列各章節描述如何設定叢集,以為 Flink 提供 Java 11 執行期支援。

使用下列步驟建立具有 Flink 和 Java 11 執行時間的EMR叢集。您在其中新增 Java 11 執行期支援的組態檔案是 flink-conf.yaml

Console
在主控台中建立具有 Flink 和 Java 11 執行時間的叢集
  1. 登入 AWS Management Console,並在 https://console.aws.amazon.com/emr 開啟 Amazon EMR主控台。

  2. 在導覽窗格中的 EMR EC2下選擇叢集,然後選擇建立叢集

  3. 選取 Amazon 6EMR.12.0 版或更新版本,然後選擇安裝 Flink 應用程式。選取您要在您的叢集上安裝的任何其他應用程式。

  4. 繼續設定您的叢集。在選用軟體設定區段中,使用預設輸入組態選項,然後輸入下列組態:

    [ { "Classification": "flink-conf", "Properties": { "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" } } ]
  5. 繼續設定並啟動您的叢集。

AWS CLI
從 建立具有 Flink 和 Java 11 執行時間的叢集 CLI
  1. 建立將 Flink 設定為使用 Java 11 的組態檔案 configurations.json

    [ { "Classification": "flink-conf", "Properties": { "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" } } ]
  2. 從 中 AWS CLI,使用 Amazon 6.12.0 EMR 或更新版本建立新的EMR叢集,並安裝 Flink 應用程式,如下列範例所示:

    aws emr create-cluster --release-label emr-6.12.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole

使用下列步驟更新執行中EMR叢集的 Flink 和 Java 11 執行時間。您在其中新增 Java 11 執行期支援的組態檔案是 flink-conf.yaml

Console
在主控台中使用 Flink 和 Java 11 執行時間更新執行中的叢集
  1. 登入 AWS Management Console,並在 https://console.aws.amazon.com/emr 開啟 Amazon EMR主控台。

  2. 在導覽窗格中EMR的 EC2下選擇叢集,然後選擇您要更新的叢集。

    注意

    叢集必須使用 Amazon 6EMR.12.0 版或更新版本來支援 Java 11。

  3. 選取組態標籤。

  4. 執行個體群組組態區段中,選取您要更新的執行中執行個體群組,然後從清單動作功能表中選擇重新設定

  5. 使用編輯屬性選項重新設定執行個體群組,如下所示。在每一項之後選取新增組態

    分類 屬性 Value

    flink-conf

    containerized.taskmanager.env.JAVA_HOME

    /usr/lib/jvm/jre-11

    flink-conf

    containerized.master.env.JAVA_HOME

    /usr/lib/jvm/jre-11

    flink-conf

    env.java.home

    /usr/lib/jvm/jre-11

  6. 選取儲存變更以新增組態。

AWS CLI
從 更新執行中的叢集以使用 Flink 和 Java 11 執行時間 CLI

使用 modify-instance-groups 命令,為執行中叢集中的執行個體群組指定新組態。

  1. 首先,建立將 Flink 設定為使用 Java 11 的組態檔案 configurations.json。在下列範例中,將 ig-1xxxxxxx9取代為您要重新設定之執行個體群組的 ID。將檔案儲存在執行 modify-instance-groups 命令的相同目錄中。

    [ { "InstanceGroupId":"ig-1xxxxxxx9", "Configurations":[ { "Classification":"flink-conf", "Properties":{ "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" }, "Configurations":[] } ] } ]
  2. 從 中 AWS CLI,執行下列命令。取代您要重新設定的執行個體群組的 ID:

    aws emr modify-instance-groups --cluster-id j-2AL4XXXXXX5T9 \ --instance-groups file://configurations.json

若要判斷執行中叢集的 Java 執行時間,請使用 登入主節點SSH,如使用 連線至主節點SSH中所述。然後執行以下命令:

ps -ef | grep flink

具有 -ef 選項的 ps 命令列出了系統上所有執行中的程序。您可以使用 grep 篩選該輸出,以尋找提及的字串 flink。檢閱 Java Runtime Environment (JRE) 值 的輸出jre-XX。在下列輸出中,jre-11 指出在執行期為 Flink 選擇 Java 11。

flink    19130     1  0 09:17 ?        00:00:15 /usr/lib/jvm/jre-11/bin/java -Djava.io.tmpdir=/mnt/tmp -Dlog.file=/usr/lib/flink/log/flink-flink-historyserver-0-ip-172-31-32-127.log -Dlog4j.configuration=file:/usr/lib/flink/conf/log4j.properties -Dlog4j.configurationFile=file:/usr/lib/flink/conf/log4j.properties -Dlogback.configurationFile=file:/usr/lib/flink/conf/logback.xml -classpath /usr/lib/flink/lib/flink-cep-1.17.0.jar:/usr/lib/flink/lib/flink-connector-files-1.17.0.jar:/usr/lib/flink/lib/flink-csv-1.17.0.jar:/usr/lib/flink/lib/flink-json-1.17.0.jar:/usr/lib/flink/lib/flink-scala_2.12-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-java-uber-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-scala-bridge_2.12-1.17.0.

或者,使用 登入主節點SSH,然後使用命令 啟動 Flink YARN工作階段flink-yarn-session -d。輸出顯示 Flink 的 Java 虛擬機器 (JVM),範例java-11-amazon-corretto如下:

2023-05-29 10:38:14,129 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: containerized.master.env.JAVA_HOME, /usr/lib/jvm/java-11-amazon-corretto.x86_64