Configurazione di Flink in Amazon EMR - Amazon EMR

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Configurazione di Flink in Amazon EMR

EMRLe versioni 6.9.0 e successive di Amazon supportano sia Hive Metastore che AWS Glue Catalog con il connettore Apache Flink a Hive. Questa sezione descrive i passaggi necessari per configurare Catalogo AWS Glue e Hive Metastore con Flink.

  1. Crea un EMR cluster con la versione 6.9.0 o successiva e almeno due applicazioni: Hive e Flink.

  2. Utilizza script runner per eseguire il seguente script come funzione passo passo:

    hive-metastore-setup.sh

    sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
    Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.
  1. Crea un EMR cluster con la versione 6.9.0 o successiva e almeno due applicazioni: Hive e Flink.

  2. Seleziona Utilizza per i metadati delle tabelle Hive nelle impostazioni del Catalogo dati AWS Glue per abilitare il Catalogo dati nel cluster.

  3. Usa script runner per eseguire il seguente script come funzione graduale: Esegui comandi e script su un cluster Amazon EMR:

    glue-catalog-setup.sh

    sudo cp /usr/lib/hive/auxlib/aws-glue-datacatalog-hive3-client.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/aws-glue-datacatalog-hive3-client.jar sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
    Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.

Puoi utilizzare la EMR configurazione di Amazon API per configurare Flink con un file di configurazione. I file configurabili all'interno di sono: API

  • flink-conf.yaml

  • log4j.properties

  • flink-log4j-session

  • log4j-cli.properties

Il file di configurazione principale per Flink è flink-conf.yaml.

Per configurare il numero di slot di attività utilizzati per Flink dalla AWS CLI
  1. Creare un file, configurations.json, con i seguenti contenuti:

    [ { "Classification": "flink-conf", "Properties": { "taskmanager.numberOfTaskSlots":"2" } } ]
  2. Quindi, creare un cluster con la seguente configurazione:

    aws emr create-cluster --release-label emr-7.3.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole
Nota

È inoltre possibile modificare alcune configurazioni con Flink. API Per ulteriori informazioni, consulta Concepts (Concetti) nella documentazione di Flink.

Con Amazon EMR versione 5.21.0 e successive, puoi sovrascrivere le configurazioni del cluster e specificare classificazioni di configurazione aggiuntive per ogni gruppo di istanze in un cluster in esecuzione. Puoi farlo utilizzando la EMR console Amazon, il AWS Command Line Interface (AWS CLI) o il AWS SDK. Per ulteriori informazioni, consulta Specifica di una configurazione per un gruppo di istanze in un cluster in esecuzione.

Il proprietario dell'applicazione sa quali risorse assegnare alle attività all'interno di Flink. Per gli esempi riportati in questa documentazione, utilizza lo stesso numero di attività delle istanze di attività utilizzate per l'applicazione. Generalmente lo consigliamo per il livello iniziale di parallelismo, ma è anche possibile aumentare la granularità del parallelismo con gli slot delle attività, che generalmente non dovrebbero superare il numero di core virtuali per istanza. Per ulteriori informazioni sull'architettura Flink, consulta Concetti nella documentazione di Flink.

JobManager of Flink rimane disponibile durante il processo di failover del nodo primario in un EMR cluster Amazon con più nodi primari. A partire da Amazon EMR 5.28.0, anche l' JobManager alta disponibilità viene abilitata automaticamente. Non è necessaria alcuna configurazione manuale.

Con EMR le versioni di Amazon 5.27.0 o precedenti, JobManager esiste un singolo punto di errore. In caso di JobManager errore, perde tutti gli stati del processo e non riprende i processi in esecuzione. È possibile abilitare l' JobManager alta disponibilità configurando il conteggio dei tentativi di applicazione, il checkpoint e l'attivazione ZooKeeper come storage di stato per Flink, come dimostra l'esempio seguente:

[ { "Classification": "yarn-site", "Properties": { "yarn.resourcemanager.am.max-attempts": "10" } }, { "Classification": "flink-conf", "Properties": { "yarn.application-attempts": "10", "high-availability": "zookeeper", "high-availability.zookeeper.quorum": "%{hiera('hadoop::zk')}", "high-availability.storageDir": "hdfs:///user/flink/recovery", "high-availability.zookeeper.path.root": "/flink" } } ]

È necessario configurare sia il numero massimo di tentativi di applicazione master per Flink che i tentativi di applicazione per YARN Flink. Per ulteriori informazioni, vedere Configurazione dell'alta disponibilità del YARN cluster. Potresti anche voler configurare il checkpoint Flink per riavviare e JobManager ripristinare i job in esecuzione dai checkpoint precedentemente completati. Per ulteriori informazioni, consulta Checkpoint di Flink.

Per EMR le versioni di Amazon che utilizzano Flink 1.11.x, devi configurare la dimensione totale del processo di memoria sia per () che per JobManager (jobmanager.memory.process.size) in. TaskManager taskmanager.memory.process.size flink-conf.yaml Puoi impostare questi valori configurando il cluster con la configurazione API o decommentando manualmente questi campi tramite. SSH Flink fornisce i seguenti valori di default.

  • jobmanager.memory.process.size: 1600m

  • taskmanager.memory.process.size: 1728m

Per escludere JVM metaspace e sovraccarico, utilizzate la dimensione totale della memoria Flink () anziché. taskmanager.memory.flink.size taskmanager.memory.process.size Il valore di default per taskmanager.memory.process.size è 1280m. Non è consigliabile impostare sia taskmanager.memory.process.size che taskmanager.memory.process.size.

Tutte le EMR versioni di Amazon che utilizzano Flink 1.12.0 e versioni successive hanno i valori predefiniti elencati nel set open source per Flink come valori predefiniti su AmazonEMR, quindi non è necessario configurarli personalmente.

I container di applicazioni Flink creano e scrivono in tre tipi di file di log: file .out, file .log e file .err. Solo i file .err vengono compressi e rimossi dal file system, mentre i file di log .log e .out rimangono nel file system. Per garantire che questi file di output rimangano gestibili e che il cluster rimanga stabile, è possibile configurare la rotazione dei log in log4j.properties per impostare un numero massimo di file e limitarne le dimensioni.

Amazon EMR versioni 5.30.0 e successive

A partire da Amazon EMR 5.30.0, Flink utilizza il framework di registrazione log4j2 con il nome flink-log4j. di classificazione della configurazione. La seguente configurazione di esempio illustra il formato log4j2.

[ { "Classification": "flink-log4j", "Properties": { "appender.main.name": "MainAppender", "appender.main.type": "RollingFile", "appender.main.append" : "false", "appender.main.fileName" : "${sys:log.file}", "appender.main.filePattern" : "${sys:log.file}.%i", "appender.main.layout.type" : "PatternLayout", "appender.main.layout.pattern" : "%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n", "appender.main.policies.type" : "Policies", "appender.main.policies.size.type" : "SizeBasedTriggeringPolicy", "appender.main.policies.size.size" : "100MB", "appender.main.strategy.type" : "DefaultRolloverStrategy", "appender.main.strategy.max" : "10" }, } ]

Amazon EMR versioni 5.29.0 e precedenti

Con EMR le versioni Amazon 5.29.0 e precedenti, Flink utilizza il framework di registrazione log4j. La seguente configurazione di esempio illustra il formato log4j

[ { "Classification": "flink-log4j", "Properties": { "log4j.appender.file": "org.apache.log4j.RollingFileAppender", "log4j.appender.file.append":"true", # keep up to 4 files and each file size is limited to 100MB "log4j.appender.file.MaxFileSize":"100MB", "log4j.appender.file.MaxBackupIndex":4, "log4j.appender.file.layout":"org.apache.log4j.PatternLayout", "log4j.appender.file.layout.ConversionPattern":"%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n" }, } ]

EMRLe versioni 6.12.0 e successive di Amazon forniscono il supporto di runtime Java 11 per Flink. Le sezioni seguenti descrivono come configurare il cluster per fornire il supporto di runtime Java 11 per Flink.

Utilizza i seguenti passaggi per creare un EMR cluster con Flink e Java 11 runtime. Il file di configurazione in cui aggiungere il supporto per il runtime Java 11 è flink-conf.yaml.

Console
Per creare un cluster con Flink e Java 11 runtime nella console
  1. Accedi a e apri AWS Management Console la EMR console Amazon all'indirizzo https://console.aws.amazon.com/emr.

  2. Scegli Clusters EC2 in EMRon nel riquadro di navigazione, quindi Crea cluster.

  3. Seleziona Amazon EMR release 6.12.0 o successiva e scegli di installare l'applicazione Flink. Seleziona tutte le altre applicazioni che desideri installare sul tuo cluster.

  4. Continua a configurare il cluster. Nella sezione Impostazioni software opzionali, utilizza l'opzione predefinita Inserisci configurazione e inserisci la seguente configurazione:

    [ { "Classification": "flink-conf", "Properties": { "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" } } ]
  5. Continua a configurare e avvia il cluster.

AWS CLI
Per creare un cluster con Flink e Java 11 runtime dal CLI
  1. Crea un file di configurazione configurations.json che configuri Flink per l'utilizzo di Java 11.

    [ { "Classification": "flink-conf", "Properties": { "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" } } ]
  2. Da AWS CLI, crea un nuovo EMR cluster con Amazon EMR versione 6.12.0 o successiva e installa l'applicazione Flink, come mostrato nell'esempio seguente:

    aws emr create-cluster --release-label emr-6.12.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole

Utilizza i seguenti passaggi per aggiornare un EMR cluster in esecuzione con Flink e Java 11 runtime. Il file di configurazione in cui aggiungere il supporto per il runtime Java 11 è flink-conf.yaml.

Console
Per aggiornare un cluster in esecuzione con Flink e Java 11 runtime nella console
  1. Accedi a e apri AWS Management Console la EMR console Amazon all'indirizzo https://console.aws.amazon.com/emr.

  2. Scegli Clusters sotto EMREC2nel riquadro di navigazione, quindi seleziona il cluster che desideri aggiornare.

    Nota

    Il cluster deve utilizzare la EMR versione Amazon 6.12.0 o successiva per supportare Java 11.

  3. Seleziona la scheda Configurazione.

  4. Nella sezione Configurazioni del gruppo di istanze, seleziona il gruppo di istanze In esecuzione che desideri aggiornare, quindi scegli Riconfigura dal menu delle azioni dell'elenco.

  5. Riconfigura il gruppo di istanze con l'opzione Modifica attributi come segue. Seleziona Aggiungi nuova configurazione dopo ognuna di esse.

    Classificazione Proprietà Valore

    flink-conf

    containerized.taskmanager.env.JAVA_HOME

    /usr/lib/jvm/jre-11

    flink-conf

    containerized.master.env.JAVA_HOME

    /usr/lib/jvm/jre-11

    flink-conf

    env.java.home

    /usr/lib/jvm/jre-11

  6. Quindi seleziona Salva modifiche per aggiungere le configurazioni.

AWS CLI
Per aggiornare un cluster in esecuzione per utilizzare Flink e Java 11 runtime dal CLI

Utilizza il comando modify-instance-groups per specificare una nuova configurazione per un gruppo di istanze in un cluster in esecuzione.

  1. Innanzitutto, crea un file di configurazione configurations.json che configuri Flink per l'utilizzo di Java 11. Nell'esempio seguente, sostituisci ig-1xxxxxxx9 con l'ID del gruppo di istanze che desiderate riconfigurare. Salva il file nella stessa directory in cui eseguirai il comando modify-instance-groups.

    [ { "InstanceGroupId":"ig-1xxxxxxx9", "Configurations":[ { "Classification":"flink-conf", "Properties":{ "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" }, "Configurations":[] } ] } ]
  2. Da AWS CLI, esegui il comando seguente. Sostituisci l'ID per il gruppo di istanze che desideri riconfigurare:

    aws emr modify-instance-groups --cluster-id j-2AL4XXXXXX5T9 \ --instance-groups file://configurations.json

Per determinare il runtime Java per un cluster in esecuzione, accedi al nodo primario con SSH come descritto in Connect to the primary node with SSH. Quindi, esegui il comando riportato di seguito:

ps -ef | grep flink

Il comando ps con l'opzione -ef elenca tutti i processi in esecuzione sul sistema. È possibile filtrare l'output con grep per trovare le menzioni della stringa flink. Esaminate l'output per il valore Java Runtime Environment (JRE),jre-XX. Nell'output seguente, jre-11 indica che Java 11 viene rilevato in fase di runtime per Flink.

flink    19130     1  0 09:17 ?        00:00:15 /usr/lib/jvm/jre-11/bin/java -Djava.io.tmpdir=/mnt/tmp -Dlog.file=/usr/lib/flink/log/flink-flink-historyserver-0-ip-172-31-32-127.log -Dlog4j.configuration=file:/usr/lib/flink/conf/log4j.properties -Dlog4j.configurationFile=file:/usr/lib/flink/conf/log4j.properties -Dlogback.configurationFile=file:/usr/lib/flink/conf/logback.xml -classpath /usr/lib/flink/lib/flink-cep-1.17.0.jar:/usr/lib/flink/lib/flink-connector-files-1.17.0.jar:/usr/lib/flink/lib/flink-csv-1.17.0.jar:/usr/lib/flink/lib/flink-json-1.17.0.jar:/usr/lib/flink/lib/flink-scala_2.12-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-java-uber-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-scala-bridge_2.12-1.17.0.

In alternativa, accedi al nodo primario con SSH e avvia una YARN sessione Flink con il comandoflink-yarn-session -d. L'output mostra la Java Virtual Machine (JVM) per Flink, java-11-amazon-corretto nel seguente esempio:

2023-05-29 10:38:14,129 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: containerized.master.env.JAVA_HOME, /usr/lib/jvm/java-11-amazon-corretto.x86_64