Configuration de Flink sur Amazon EMR - Amazon EMR

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Configuration de Flink sur Amazon EMR

Les EMR versions 6.9.0 et supérieures d'Amazon prennent en charge à la fois Hive Metastore et AWS Glue Catalog avec le connecteur Apache Flink vers Hive. Cette section décrit les étapes nécessaires pour configurer AWS Glue Catalog et Hive Metastore avec Flink.

  1. Créez un EMR cluster avec la version 6.9.0 ou supérieure et au moins deux applications : Hive et Flink.

  2. Utilisez script runner pour exécuter le script suivant en tant que fonction d'étape :

    hive-metastore-setup.sh

    sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
    Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.
  1. Créez un EMR cluster avec la version 6.9.0 ou supérieure et au moins deux applications : Hive et Flink.

  2. Sélectionnez Utiliser les métadonnées de la table Hive dans les paramètres du catalogue de données AWS Glue pour activer le catalogue de données dans le cluster.

  3. Utilisez le script Runner pour exécuter le script suivant en tant que fonction d'étape : Exécutez des commandes et des scripts sur un EMR cluster Amazon :

    glue-catalog-setup.sh

    sudo cp /usr/lib/hive/auxlib/aws-glue-datacatalog-hive3-client.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/aws-glue-datacatalog-hive3-client.jar sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
    Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.

Vous pouvez utiliser la EMR configuration Amazon API pour configurer Flink à l'aide d'un fichier de configuration. Les fichiers configurables dans le API sont les suivants :

  • flink-conf.yaml

  • log4j.properties

  • flink-log4j-session

  • log4j-cli.properties

Le fichier de configuration principal de Flink est flink-conf.yaml.

Pour configurer le nombre d'emplacements de tâches utilisés par Flink à partir du fichier AWS CLI
  1. Créez un fichier, configurations.json, contenant les éléments suivants :

    [ { "Classification": "flink-conf", "Properties": { "taskmanager.numberOfTaskSlots":"2" } } ]
  2. Créez ensuite un cluster à l'aide de la configuration suivante :

    aws emr create-cluster --release-label emr-7.3.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole
Note

Vous pouvez également modifier certaines configurations avec le FlinkAPI. Pour plus d'informations, consultez Concepts dans la documentation Flink.

Avec Amazon EMR version 5.21.0 et versions ultérieures, vous pouvez remplacer les configurations de cluster et spécifier des classifications de configuration supplémentaires pour chaque groupe d'instances d'un cluster en cours d'exécution. Pour ce faire, utilisez la EMR console Amazon, le AWS Command Line Interface (AWS CLI) ou le AWS SDK. Pour plus d'informations, consultez Fourniture d'une configuration pour un groupe d'instances dans un cluster en cours d'exécution.

En tant que propriétaire de votre application, c'est vous qui savez le mieux quelles ressources attribuer aux tâches dans Flink. Pour les exemples de cette documentation, utilisez le même nombre de tâches que les instances de tâches que vous utilisez pour l'application. Généralement, nous recommandons cela pour le niveau initial de parallélisme, mais vous pouvez également augmenter la granularité de parallélisme à l'aide des emplacements de tâches, qui ne doivent généralement pas dépasser le nombre de cœurs virtuels par instance. Pour plus d'informations sur l'architecture de Flink, consultez Concepts dans la documentation Flink.

Le JobManager de Flink reste disponible pendant le processus de basculement du nœud principal dans un EMR cluster Amazon comportant plusieurs nœuds principaux. À partir d'Amazon EMR 5.28.0, la JobManager haute disponibilité est également activée automatiquement. Aucune configuration manuelle n'est nécessaire.

Avec EMR les versions 5.27.0 ou antérieures d'Amazon, il JobManager s'agit d'un point de défaillance unique. En cas d' JobManager échec, il perd tous les états des tâches et ne reprend pas les tâches en cours d'exécution. Vous pouvez activer la JobManager haute disponibilité en configurant le nombre de tentatives d'application, le point de contrôle et ZooKeeper en activant le stockage d'état pour Flink, comme le montre l'exemple suivant :

[ { "Classification": "yarn-site", "Properties": { "yarn.resourcemanager.am.max-attempts": "10" } }, { "Classification": "flink-conf", "Properties": { "yarn.application-attempts": "10", "high-availability": "zookeeper", "high-availability.zookeeper.quorum": "%{hiera('hadoop::zk')}", "high-availability.storageDir": "hdfs:///user/flink/recovery", "high-availability.zookeeper.path.root": "/flink" } } ]

Vous devez configurer le nombre maximal de tentatives de master d'application pour Flink YARN et de tentatives d'application pour Flink. Pour plus d'informations, consultez la section Configuration de la haute disponibilité du YARN cluster. Vous pouvez également configurer le point de contrôle Flink pour que les tâches de JobManager restauration en cours d'exécution redémarrées soient restaurées à partir de points de contrôle précédemment terminés. Pour plus d'informations, consultez Flink checkpointing.

Pour EMR les versions d'Amazon qui utilisent Flink 1.11.x, vous devez configurer la taille totale du processus de mémoire pour JobManager (jobmanager.memory.process.size) et TaskManager (taskmanager.memory.process.size) dans. flink-conf.yaml Vous pouvez définir ces valeurs soit en configurant le cluster avec la configuration, API soit en décommentant manuellement ces champs viaSSH. Flink fournit les valeurs par défaut suivantes.

  • jobmanager.memory.process.size : 1600m

  • taskmanager.memory.process.size : 1728m

Pour exclure le JVM métaspace et la surcharge, utilisez la taille de mémoire totale de Flink (taskmanager.memory.flink.size) au lieu de. taskmanager.memory.process.size La valeur par défaut du paramètre taskmanager.memory.process.size est 1280m. Il n'est pas recommandé de définir à la fois taskmanager.memory.process.size et taskmanager.memory.process.size.

Toutes les EMR versions d'Amazon qui utilisent Flink 1.12.0 et versions ultérieures ont les valeurs par défaut répertoriées dans l'ensemble open source pour Flink comme valeurs par défaut sur AmazonEMR. Vous n'avez donc pas besoin de les configurer vous-même.

Les conteneurs d'applications Flink créent et écrivent dans trois types de fichiers journaux : fichiers .out, fichiers .log et fichiers .err. Seuls .err les fichiers sont compressés et supprimés du système de fichiers, tandis que les fichiers journaux .log et .out restent dans le système de fichiers. Pour garantir la gérabilité de ces fichiers de sortie et la stabilité du cluster, vous pouvez configurer la rotation des journaux dans log4j.properties afin de définir un nombre maximum de fichiers et de limiter leur taille.

Amazon EMR versions 5.30.0 et ultérieures

À partir d'Amazon EMR 5.30.0, Flink utilise le framework de journalisation log4j2 avec le nom de classification de configuration. L'exemple de configuration suivant illustre flink-log4j. le format log4j2.

[ { "Classification": "flink-log4j", "Properties": { "appender.main.name": "MainAppender", "appender.main.type": "RollingFile", "appender.main.append" : "false", "appender.main.fileName" : "${sys:log.file}", "appender.main.filePattern" : "${sys:log.file}.%i", "appender.main.layout.type" : "PatternLayout", "appender.main.layout.pattern" : "%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n", "appender.main.policies.type" : "Policies", "appender.main.policies.size.type" : "SizeBasedTriggeringPolicy", "appender.main.policies.size.size" : "100MB", "appender.main.strategy.type" : "DefaultRolloverStrategy", "appender.main.strategy.max" : "10" }, } ]

Amazon EMR versions 5.29.0 et antérieures

Avec EMR les versions 5.29.0 et antérieures d'Amazon, Flink utilise le framework de journalisation log4j. L'exemple de configuration suivant illustre le format log4j.

[ { "Classification": "flink-log4j", "Properties": { "log4j.appender.file": "org.apache.log4j.RollingFileAppender", "log4j.appender.file.append":"true", # keep up to 4 files and each file size is limited to 100MB "log4j.appender.file.MaxFileSize":"100MB", "log4j.appender.file.MaxBackupIndex":4, "log4j.appender.file.layout":"org.apache.log4j.PatternLayout", "log4j.appender.file.layout.ConversionPattern":"%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n" }, } ]

EMRLes versions 6.12.0 et supérieures d'Amazon fournissent un support d'exécution Java 11 pour Flink. Les sections suivantes décrivent comment configurer le cluster pour fournir un support d'exécution Java 11 pour Flink.

Procédez comme suit pour créer un EMR cluster avec Flink et Java 11 Runtime. Le fichier de configuration dans lequel vous ajoutez la prise en charge de Java 11 est flink-conf.yaml.

Console
Pour créer un cluster avec Flink et Java 11 Runtime dans la console
  1. Connectez-vous au et ouvrez AWS Management Console la EMR console Amazon à l'adresse https://console.aws.amazon.com/emr.

  2. Choisissez Clusters sous EMRactivé EC2 dans le volet de navigation, puis Create cluster.

  3. Sélectionnez Amazon EMR version 6.12.0 ou supérieure, puis choisissez d'installer l'application Flink. Sélectionnez les autres applications que vous souhaitez installer sur votre cluster.

  4. Poursuivez la configuration de votre cluster. Dans la section facultatifs Paramètres logiciels, utilisez l'option par défaut Entrer un configuration et entrez la configuration suivante :

    [ { "Classification": "flink-conf", "Properties": { "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" } } ]
  5. Continuez à configurer et à lancer votre cluster.

AWS CLI
Pour créer un cluster avec Flink et le runtime Java 11 à partir du CLI
  1. Créez un fichier de configuration configurations.json qui configure Flink pour utiliser Java 11.

    [ { "Classification": "flink-conf", "Properties": { "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" } } ]
  2. À partir de AWS CLI, créez un nouveau EMR cluster avec Amazon EMR version 6.12.0 ou ultérieure, et installez l'application Flink, comme indiqué dans l'exemple suivant :

    aws emr create-cluster --release-label emr-6.12.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole

Procédez comme suit pour mettre à jour un EMR cluster en cours d'exécution avec Flink et Java 11 Runtime. Le fichier de configuration dans lequel vous ajoutez la prise en charge de Java 11 est flink-conf.yaml.

Console
Pour mettre à jour un cluster en cours d'exécution avec Flink et Java 11 Runtime dans la console
  1. Connectez-vous au et ouvrez AWS Management Console la EMR console Amazon à l'adresse https://console.aws.amazon.com/emr.

  2. Choisissez Clusters sous EMRactivé EC2 dans le volet de navigation, puis sélectionnez le cluster que vous souhaitez mettre à jour.

    Note

    Le cluster doit utiliser Amazon EMR version 6.12.0 ou ultérieure pour prendre en charge Java 11.

  3. Sélectionnez l'onglet Configurations.

  4. Dans la section Configurations du groupe d'instances, sélectionnez le groupe d'instances en cours d'exécution que vous souhaitez mettre à jour, puis choisissez Reconfigurer dans le menu d'actions de la liste.

  5. Reconfigurez le groupe d'instances avec l'option Modifier les attributs comme suit. Sélectionnez Ajouter une nouvelle configuration après chacune d'entre elles.

    Classification Propriété Valeur

    flink-conf

    containerized.taskmanager.env.JAVA_HOME

    /usr/lib/jvm/jre-11

    flink-conf

    containerized.master.env.JAVA_HOME

    /usr/lib/jvm/jre-11

    flink-conf

    env.java.home

    /usr/lib/jvm/jre-11

  6. Sélectionnez Enregistrer les modifications pour ajouter les configurations.

AWS CLI
Pour mettre à jour un cluster en cours d'exécution afin d'utiliser Flink et le runtime Java 11 à partir du CLI

Utilisez la commande modify-instance-groups pour spécifier une nouvelle configuration pour un groupe d'instances dans un cluster en cours d'exécution.

  1. Créez d'abord un fichier de configuration configurations.json qui configure Flink pour utiliser Java 11. Dans l'exemple suivant, remplacez ig-1xxxxxxx9 avec l'ID du groupe d'instances que vous souhaitez reconfigurer. Enregistrez le fichier dans le même répertoire que celui où vous exécuterez la commande modify-instance-groups.

    [ { "InstanceGroupId":"ig-1xxxxxxx9", "Configurations":[ { "Classification":"flink-conf", "Properties":{ "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" }, "Configurations":[] } ] } ]
  2. À partir de AWS CLI, exécutez la commande suivante. Remplacez l'ID du groupe d'instances que vous souhaitez reconfigurer :

    aws emr modify-instance-groups --cluster-id j-2AL4XXXXXX5T9 \ --instance-groups file://configurations.json

Pour déterminer l'environnement d'exécution Java d'un cluster en cours d'exécution, connectez-vous au nœud principal avec, SSH comme décrit dans la section Se connecter au nœud principal avec SSH. Ensuite, exécutez la commande suivante :

ps -ef | grep flink

La commande ps associée à l'option -ef répertorie tous les processus en cours d'exécution sur le système. Vous pouvez filtrer cette sortie avec grep pour trouver les mentions de la chaîne flink. Vérifiez le résultat pour la valeur Java Runtime Environment (JRE),jre-XX. Dans le résultat suivant, jre-11 indique que Java 11 est activé lors de l'exécution de Flink.

flink    19130     1  0 09:17 ?        00:00:15 /usr/lib/jvm/jre-11/bin/java -Djava.io.tmpdir=/mnt/tmp -Dlog.file=/usr/lib/flink/log/flink-flink-historyserver-0-ip-172-31-32-127.log -Dlog4j.configuration=file:/usr/lib/flink/conf/log4j.properties -Dlog4j.configurationFile=file:/usr/lib/flink/conf/log4j.properties -Dlogback.configurationFile=file:/usr/lib/flink/conf/logback.xml -classpath /usr/lib/flink/lib/flink-cep-1.17.0.jar:/usr/lib/flink/lib/flink-connector-files-1.17.0.jar:/usr/lib/flink/lib/flink-csv-1.17.0.jar:/usr/lib/flink/lib/flink-json-1.17.0.jar:/usr/lib/flink/lib/flink-scala_2.12-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-java-uber-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-scala-bridge_2.12-1.17.0.

Vous pouvez également vous connecter au nœud principal avec une commande SSH et démarrer une YARN session Flink avec cette commandeflink-yarn-session -d. La sortie montre la machine virtuelle Java (JVM) pour Flink, java-11-amazon-corretto dans l'exemple suivant :

2023-05-29 10:38:14,129 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: containerized.master.env.JAVA_HOME, /usr/lib/jvm/java-11-amazon-corretto.x86_64