Configuration de Flink dans Amazon EMR - Amazon EMR

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Configuration de Flink dans Amazon EMR

Les versions 6.9.0 et supérieures d'Amazon EMR prennent en charge à la fois Hive Metastore et AWS Glue Catalog avec le connecteur Apache Flink vers Hive. Cette section décrit les étapes nécessaires pour configurer AWS Glue Catalog et Hive Metastore avec Flink.

  1. Créez un cluster EMR avec la version 6.9.0 ou supérieure et au moins deux applications : Hive et Flink.

  2. Utilisez script runner pour exécuter le script suivant en tant que fonction d'étape :

    hive-metastore-setup.sh

    sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
    Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.
  1. Créez un cluster EMR avec la version 6.9.0 ou supérieure et au moins deux applications : Hive et Flink.

  2. Sélectionnez Utiliser les métadonnées de la table Hive dans les paramètres du catalogue de données AWS Glue pour activer le catalogue de données dans le cluster.

  3. Utilisez le script runner pour exécuter le script suivant en tant que fonction d'étape : Exécuter des commandes et des scripts sur un cluster Amazon EMR :

    glue-catalog-setup.sh

    sudo cp /usr/lib/hive/auxlib/aws-glue-datacatalog-hive3-client.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/aws-glue-datacatalog-hive3-client.jar sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
    Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.

Vous pouvez utiliser l'API de configuration Amazon EMR pour configurer Flink à l'aide d'un fichier de configuration. Les fichiers configurables dans l'API sont les suivants :

  • flink-conf.yaml

  • log4j.properties

  • flink-log4j-session

  • log4j-cli.properties

Le fichier de configuration principal de Flink est flink-conf.yaml.

Pour configurer le nombre d'emplacements de tâches utilisés par Flink à partir du fichier AWS CLI
  1. Créez un fichier, configurations.json, contenant les éléments suivants :

    [ { "Classification": "flink-conf", "Properties": { "taskmanager.numberOfTaskSlots":"2" } } ]
  2. Créez ensuite un cluster à l'aide de la configuration suivante :

    aws emr create-cluster --release-label emr-7.6.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole
Note

Vous pouvez également modifier certaines configurations avec l'API Flink. Pour plus d'informations, consultez Concepts dans la documentation Flink.

Avec la version 5.21.0 et ultérieures d'Amazon EMR, vous permet de remplacer les configurations de cluster et de spécifier des classifications de configuration supplémentaires pour chaque groupe d'instances dans un cluster en cours d'exécution. Pour ce faire, utilisez la console Amazon EMR, le AWS Command Line Interface (AWS CLI) ou le AWS SDK. Pour plus d'informations, consultez Fourniture d'une configuration pour un groupe d'instances dans un cluster en cours d'exécution.

En tant que propriétaire de votre application, c'est vous qui savez le mieux quelles ressources attribuer aux tâches dans Flink. Pour les exemples de cette documentation, utilisez le même nombre de tâches que les instances de tâches que vous utilisez pour l'application. Généralement, nous recommandons cela pour le niveau initial de parallélisme, mais vous pouvez également augmenter la granularité de parallélisme à l'aide des emplacements de tâches, qui ne doivent généralement pas dépasser le nombre de cœurs virtuels par instance. Pour plus d'informations sur l'architecture de Flink, consultez Concepts dans la documentation Flink.

Le JobManager de Flink reste disponible pendant le processus de basculement du nœud principal dans un cluster Amazon EMR comportant plusieurs nœuds principaux. À partir d'Amazon EMR 5.28.0, la JobManager haute disponibilité est également activée automatiquement. Aucune configuration manuelle n'est nécessaire.

Avec les versions 5.27.0 ou antérieures d'Amazon EMR, il s' JobManager agit d'un point de défaillance unique. En cas d' JobManager échec, il perd tous les états des tâches et ne reprend pas les tâches en cours d'exécution. Vous pouvez activer la JobManager haute disponibilité en configurant le nombre de tentatives d'application, le point de contrôle et ZooKeeper en activant le stockage d'état pour Flink, comme le montre l'exemple suivant :

[ { "Classification": "yarn-site", "Properties": { "yarn.resourcemanager.am.max-attempts": "10" } }, { "Classification": "flink-conf", "Properties": { "yarn.application-attempts": "10", "high-availability": "zookeeper", "high-availability.zookeeper.quorum": "%{hiera('hadoop::zk')}", "high-availability.storageDir": "hdfs:///user/flink/recovery", "high-availability.zookeeper.path.root": "/flink" } } ]

Vous devez configurer le nombre maximal de tentatives du maître d'application pour YARN et les tentatives d'application pour Flink. Pour plus d'informations, consultez Configuration de la haute disponibilité du cluster YARN. Vous pouvez également configurer le point de contrôle Flink pour que les tâches de JobManager restauration en cours d'exécution redémarrées soient restaurées à partir de points de contrôle précédemment terminés. Pour plus d'informations, consultez Flink checkpointing.

Pour les versions d'Amazon EMR qui utilisent Flink 1.11.x, vous devez configurer la taille totale du processus de mémoire pour () et JobManager (jobmanager.memory.process.size) dans. TaskManager taskmanager.memory.process.size flink-conf.yaml Vous pouvez définir ces valeurs soit en configurant le cluster à l'aide de l'API de configuration, soit en décommentant manuellement ces champs via SSH. Flink fournit les valeurs par défaut suivantes.

  • jobmanager.memory.process.size : 1600m

  • taskmanager.memory.process.size : 1728m

Pour exclure le métaspace JVM et la surcharge, utilisez la taille de mémoire totale de Flink (taskmanager.memory.flink.size) au lieu de taskmanager.memory.process.size. La valeur par défaut du paramètre taskmanager.memory.process.size est 1280m. Il n'est pas recommandé de définir à la fois taskmanager.memory.process.size et taskmanager.memory.process.size.

Toutes les versions d'Amazon EMR qui utilisent Flink 1.12.0 et versions ultérieures ont les valeurs par défaut répertoriées dans l'ensemble open source pour Flink comme valeurs par défaut sur Amazon EMR. Vous n'avez donc pas besoin de les configurer vous-même.

Les conteneurs d'applications Flink créent et écrivent dans trois types de fichiers journaux : fichiers .out, fichiers .log et fichiers .err. Seuls .err les fichiers sont compressés et supprimés du système de fichiers, tandis que les fichiers journaux .log et .out restent dans le système de fichiers. Pour garantir la gérabilité de ces fichiers de sortie et la stabilité du cluster, vous pouvez configurer la rotation des journaux dans log4j.properties afin de définir un nombre maximum de fichiers et de limiter leur taille.

Amazon EMR versions 5.30.0 et ultérieures

À partir d'Amazon EMR 5.30.0, Flink utilise le framework de journalisation log4j2 avec le nom de classification de configuration flink-log4j.. L'exemple de configuration suivant illustre le format log4j2.

[ { "Classification": "flink-log4j", "Properties": { "appender.main.name": "MainAppender", "appender.main.type": "RollingFile", "appender.main.append" : "false", "appender.main.fileName" : "${sys:log.file}", "appender.main.filePattern" : "${sys:log.file}.%i", "appender.main.layout.type" : "PatternLayout", "appender.main.layout.pattern" : "%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n", "appender.main.policies.type" : "Policies", "appender.main.policies.size.type" : "SizeBasedTriggeringPolicy", "appender.main.policies.size.size" : "100MB", "appender.main.strategy.type" : "DefaultRolloverStrategy", "appender.main.strategy.max" : "10" }, } ]

Amazon EMR versions 5.29.0 et antérieures

Avec les versions 5.29.0 et antérieures d'Amazon EMR, Flink utilise le framework de journalisation log4j. L'exemple de configuration suivant illustre le format log4j.

[ { "Classification": "flink-log4j", "Properties": { "log4j.appender.file": "org.apache.log4j.RollingFileAppender", "log4j.appender.file.append":"true", # keep up to 4 files and each file size is limited to 100MB "log4j.appender.file.MaxFileSize":"100MB", "log4j.appender.file.MaxBackupIndex":4, "log4j.appender.file.layout":"org.apache.log4j.PatternLayout", "log4j.appender.file.layout.ConversionPattern":"%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n" }, } ]

Les versions 6.12.0 et supérieures d'Amazon EMR fournissent un support d'exécution Java 11 pour Flink. Les sections suivantes décrivent comment configurer le cluster pour fournir un support d'exécution Java 11 pour Flink.

Procédez comme suit pour créer un cluster EMR avec Flink et Java 11 Runtime. Le fichier de configuration dans lequel vous ajoutez la prise en charge de Java 11 est flink-conf.yaml.

Console
Pour créer un cluster avec Flink et Java 11 Runtime dans la console
  1. Connectez-vous au et ouvrez la AWS Management Console console Amazon EMR à l'adresse /emr. https://console.aws.amazon.com

  2. Choisissez Clusters sous EMR activé EC2 dans le volet de navigation, puis Create cluster.

  3. Sélectionnez Amazon EMR version 6.12.0 ou supérieure, puis choisissez d'installer l'application Flink. Sélectionnez les autres applications que vous souhaitez installer sur votre cluster.

  4. Poursuivez la configuration de votre cluster. Dans la section facultatifs Paramètres logiciels, utilisez l'option par défaut Entrer un configuration et entrez la configuration suivante :

    [ { "Classification": "flink-conf", "Properties": { "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" } } ]
  5. Continuez à configurer et à lancer votre cluster.

AWS CLI
Pour créer un cluster avec Flink et Java 11 Runtime à partir de la CLI
  1. Créez un fichier de configuration configurations.json qui configure Flink pour utiliser Java 11.

    [ { "Classification": "flink-conf", "Properties": { "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" } } ]
  2. À partir de AWS CLI, créez un nouveau cluster EMR avec Amazon EMR version 6.12.0 ou ultérieure, et installez l'application Flink, comme indiqué dans l'exemple suivant :

    aws emr create-cluster --release-label emr-6.12.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole

Procédez comme suit pour mettre à jour un cluster EMR en cours d'exécution avec Flink et Java 11 Runtime. Le fichier de configuration dans lequel vous ajoutez la prise en charge de Java 11 est flink-conf.yaml.

Console
Pour mettre à jour un cluster en cours d'exécution avec Flink et Java 11 Runtime dans la console
  1. Connectez-vous au et ouvrez la AWS Management Console console Amazon EMR à l'adresse /emr. https://console.aws.amazon.com

  2. Choisissez Clusters sous EMR activé EC2 dans le volet de navigation, puis sélectionnez le cluster que vous souhaitez mettre à jour.

    Note

    Le cluster doit utiliser Amazon EMR version 6.12.0 ou ultérieure pour prendre en charge Java 11.

  3. Sélectionnez l'onglet Configurations.

  4. Dans la section Configurations du groupe d'instances, sélectionnez le groupe d'instances en cours d'exécution que vous souhaitez mettre à jour, puis choisissez Reconfigurer dans le menu d'actions de la liste.

  5. Reconfigurez le groupe d'instances avec l'option Modifier les attributs comme suit. Sélectionnez Ajouter une nouvelle configuration après chacune d'entre elles.

    Classification Propriété Valeur

    flink-conf

    containerized.taskmanager.env.JAVA_HOME

    /usr/lib/jvm/jre-11

    flink-conf

    containerized.master.env.JAVA_HOME

    /usr/lib/jvm/jre-11

    flink-conf

    env.java.home

    /usr/lib/jvm/jre-11

  6. Sélectionnez Enregistrer les modifications pour ajouter les configurations.

AWS CLI
Pour mettre à jour un cluster en cours d'exécution afin d'utiliser Flink et le runtime Java 11 à partir de la CLI

Utilisez la commande modify-instance-groups pour spécifier une nouvelle configuration pour un groupe d'instances dans un cluster en cours d'exécution.

  1. Créez d'abord un fichier de configuration configurations.json qui configure Flink pour utiliser Java 11. Dans l'exemple suivant, remplacez ig-1xxxxxxx9 par l'ID du groupe d'instances que vous souhaitez reconfigurer. Enregistrez le fichier dans le même répertoire que celui où vous exécuterez la commande modify-instance-groups.

    [ { "InstanceGroupId":"ig-1xxxxxxx9", "Configurations":[ { "Classification":"flink-conf", "Properties":{ "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" }, "Configurations":[] } ] } ]
  2. À partir de AWS CLI, exécutez la commande suivante. Remplacez l'ID du groupe d'instances que vous souhaitez reconfigurer :

    aws emr modify-instance-groups --cluster-id j-2AL4XXXXXX5T9 \ --instance-groups file://configurations.json

Pour déterminer l'environnement d'exécution Java d'un cluster en cours d'exécution, connectez-vous au nœud primaire avec SSH, comme décrit dans Connexion au nœud primaire avec SSH. Ensuite, exécutez la commande suivante :

ps -ef | grep flink

La commande ps associée à l'option -ef répertorie tous les processus en cours d'exécution sur le système. Vous pouvez filtrer cette sortie avec grep pour trouver les mentions de la chaîne flink. Vérifiez le résultat pour la valeur de l'environnement d'exécution Java (JRE), jre-XX. Dans le résultat suivant, jre-11 indique que Java 11 est activé lors de l'exécution de Flink.

flink    19130     1  0 09:17 ?        00:00:15 /usr/lib/jvm/jre-11/bin/java -Djava.io.tmpdir=/mnt/tmp -Dlog.file=/usr/lib/flink/log/flink-flink-historyserver-0-ip-172-31-32-127.log -Dlog4j.configuration=file:/usr/lib/flink/conf/log4j.properties -Dlog4j.configurationFile=file:/usr/lib/flink/conf/log4j.properties -Dlogback.configurationFile=file:/usr/lib/flink/conf/logback.xml -classpath /usr/lib/flink/lib/flink-cep-1.17.0.jar:/usr/lib/flink/lib/flink-connector-files-1.17.0.jar:/usr/lib/flink/lib/flink-csv-1.17.0.jar:/usr/lib/flink/lib/flink-json-1.17.0.jar:/usr/lib/flink/lib/flink-scala_2.12-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-java-uber-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-scala-bridge_2.12-1.17.0.

Vous pouvez également vous connecter au nœud primaire avec SSH et démarrer une session Flink YARN avec une commande flink-yarn-session -d. La sortie montre la machine virtuelle Java (JVM) pour Flink, java-11-amazon-corretto dans l'exemple suivant :

2023-05-29 10:38:14,129 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: containerized.master.env.JAVA_HOME, /usr/lib/jvm/java-11-amazon-corretto.x86_64