Utilisation de la haute disponibilité (HA) pour les opérateurs et les applications Flink - Amazon EMR

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utilisation de la haute disponibilité (HA) pour les opérateurs et les applications Flink

Cette rubrique explique comment configurer la haute disponibilité et décrit son fonctionnement pour différents cas d'utilisation. Cela inclut lorsque vous utilisez le Job Manager et lorsque vous utilisez les kubernetes natifs de Flink.

Nous activons la haute disponibilité de l’opérateur Flink afin de pouvoir basculer vers un opérateur Flink de secours et réduire au minimum les temps d’arrêt dans la boucle de contrôle de l’opérateur en cas de défaillance. La haute disponibilité est activée par défaut et le nombre initial par défaut de répliques d’opérateur est de 2. Vous pouvez configurer le champ des répliques dans votre fichier values.yaml pour les Charts de Helm.

Les champs suivants sont personnalisables :

  • replicas (facultatif, la valeur par défaut est 2) : si vous définissez ce nombre sur une valeur supérieure à 1, d'autres opérateurs de secours seront créés et vous pourrez reprendre votre tâche plus rapidement.

  • highAvailabilityEnabled (facultatif, la valeur par défaut est « true ») : permet d'indiquer si vous souhaitez activer la haute disponibilité (HA). Le fait de définir ce paramètre comme « true » permet de prendre en charge le déploiement multi-AZ et de définir les paramètres flink-conf.yaml corrects.

Vous pouvez désactiver la haute disponibilité pour votre opérateur en paramétrant la configuration ci-dessous dans votre fichier values.yaml.

... imagePullSecrets: [] replicas: 1 # set this to false if you don't want HA highAvailabilityEnabled: false ...

Déploiement multi-AZ

Nous créons les pods des opérateurs dans plusieurs zones de disponibilité. Il s'agit d'une contrainte souple, et vos pods d'opérateur seront planifiés dans la même zone si vous ne disposez pas de suffisamment de ressources dans une autre zone.

Détermination de la réplique leader

Si HA est activé, les répliques utilisent un bail pour déterminer lequel des deux JMs est le leader et utilisent un bail K8s pour l'élection du leader. Vous pouvez décrire le bail et consulter le champ .Spec.Holder Identity pour déterminer le leader actuel

kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"

Interaction Flink-S3

Configuration des informations d'identification d'accès

Assurez-vous que vous avez configuré IRSA avec IAM les autorisations appropriées pour accéder au compartiment S3.

Récupération des fichiers JAR des tâches à partir du mode d'application S3

L'opérateur Flink prend également en charge la récupération des fichiers JAR des applications à partir de S3. Il vous suffit de fournir l'emplacement S3 du pot URI dans votre FlinkDeployment spécification.

Vous pouvez également utiliser cette fonctionnalité pour télécharger d'autres artefacts tels que PyFlink des scripts. Le script Python résultant est déposé sous le chemin /opt/flink/usrlib/.

L'exemple suivant montre comment utiliser cette fonctionnalité pour une PyFlink tâche. Notez les champs jar URI et args.

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: python-example spec: image: <YOUR CUSTOM PYFLINK IMAGE> emrReleaseLabel: "emr-6.12.0-flink-latest" flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" serviceAccount: flink jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"] parallelism: 1 upgradeMode: stateless

Connecteurs S3 pour Flink

Flink est livré avec deux connecteurs S3 (indiqués ci-dessous). Les sections suivantes expliquent quand utiliser quel connecteur.

Point de contrôle : connecteur S3 pour Presto

  • Définissez le schéma S3 sur s3p://

  • Le connecteur recommandé à utiliser pour le point de contrôle vers S3. Pour plus d'informations, consultez la section spécifique à S3 dans la documentation d'Apache Flink.

Exemple de FlinkDeployment spécification :

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/

Lecture et écriture sur S3 : connecteur Hadoop S3

  • Définissez le schéma S3 sur s3:// ou (s3a://)

  • Le connecteur recommandé pour lire et écrire des fichiers à partir de S3 (uniquement le connecteur S3 qui implémente l'interface Filesystem de Flinks).

  • Par défaut, nous avons défini fs.s3a.aws.credentials.provider le flink-conf.yaml fichier, qui estcom.amazonaws.auth.WebIdentityTokenCredentialsProvider. Si vous remplacez complètement la flink-conf par défaut et que vous interagissez avec S3, assurez-vous d'utiliser ce fournisseur.

Exemple de FlinkDeployment spécification

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: job: jarURI: local:///opt/flink/examples/streaming/WordCount.jar args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ] parallelism: 2 upgradeMode: stateless

La haute disponibilité (HA) pour les déploiements de Flink permet aux tâches de continuer à progresser même en cas d'erreur transitoire et de panne. JobManager Les tâches redémarreront, mais à partir du dernier point de contrôle validé lorsque la haute disponibilité est activée. Si la haute disponibilité n'est pas activée, Kubernetes redémarrera votre tâche JobManager, mais votre tâche redémarrera comme une nouvelle tâche et perdra sa progression. Après avoir configuré HA, nous pouvons demander à Kubernetes de stocker les métadonnées HA dans un stockage persistant afin de les référencer en cas de défaillance transitoire du, JobManager puis de reprendre nos tâches à partir du dernier point de contrôle réussi.

La haute disponibilité est activée par défaut pour vos tâches Flink (le nombre de répliques est défini sur 2, ce qui nécessite la mise à disposition d'un emplacement de stockage S3 pour l'enregistrement des métadonnées haute disponibilité).

Configurations de haute disponibilité

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" executionRoleArn: "<JOB EXECUTION ROLE ARN>" emrReleaseLabel: "emr-6.13.0-flink-latest" jobManager: resource: memory: "2048m" cpu: 1 replicas: 2 highAvailabilityEnabled: true storageDir: "s3://<S3 PERSISTENT STORAGE DIR>" taskManager: resource: memory: "2048m" cpu: 1

Vous trouverez ci-dessous les descriptions des configurations HA ci-dessus dans Job Manager (définies sous .spec). jobManager) :

  • highAvailabilityEnabled (facultatif, la valeur par défaut est « true ») : définissez ce paramètre sur false si vous ne voulez pas activer la haute disponibilité et si vous ne souhaitez pas utiliser les configurations de haute disponibilité fournies. Vous pouvez toujours manipuler le champ « replicas » pour configurer manuellement la haute disponibilité.

  • replicas(facultatif, la valeur par défaut est 2) : Si vous définissez ce nombre sur une valeur supérieure à 1, vous créez un autre mode de veille JobManagers et vous pouvez reprendre votre travail plus rapidement. Si vous désactivez la haute disponibilité, vous devez définir le nombre de répliques sur 1, sinon vous continuerez à recevoir des erreurs de validation (une seule réplique est prise en charge si la haute disponibilité n'est pas activée).

  • storageDir(obligatoire) : étant donné que nous utilisons le nombre de répliques comme 2 par défaut, nous devons fournir une valeur persistantestorageDir. Actuellement, ce champ n'accepte que les chemins S3 comme emplacement de stockage.

Placement des pods

Si vous activez la haute disponibilité, nous essayons également de colocaliser les pods dans la même zone, ce qui améliore les performances (réduction de la latence du réseau grâce à la présence de pods dans les mêmes zonesAZs). Ceci est un processus réalisé au mieux des possibilités, signifiant que si vous ne disposez pas de ressources suffisantes dans la zone de disponibilité où la majorité de vos pods sont planifiés, les pods restants seront tout de même planifiés, mais pourraient se retrouver sur un nœud situé en dehors de cette zone de disponibilité.

Détermination de la réplique leader

Si HA est activé, les répliques utilisent un bail pour déterminer lequel d'entre eux JMs est le leader et utilisent un K8s Configmap comme banque de données pour stocker ces métadonnées. Si vous souhaitez identifier le leader, vous pouvez consulter le contenu de la Configmap et la clé org.apache.flink.k8s.leader.restserver sous les données pour trouver le pod K8s correspondant à l’adresse IP indiquée. Vous pouvez également utiliser les commandes bash ci-dessous.

ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}') kubectl get pods -n NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"

Amazon EMR 6.13.0 et versions ultérieures prennent en charge Kubernetes natif de Flink pour exécuter des applications Flink en mode haute disponibilité sur un cluster Amazon. EKS

Note

Il est nécessaire de disposer d'un compartiment Amazon S3 préalablement créé pour conserver les métadonnées de haute disponibilité lorsque vous soumettez votre tâche Flink. Si vous ne souhaitez pas utiliser cette fonctionnalité, vous pouvez la désactiver. Elle est activée par défaut.

Pour activer la fonction de haute disponibilité de Flink, fournissez les paramètres Flink suivants lorsque vous exécutez la commande. run-application CLI Les paramètres sont définis dans l’exemple ci-dessous.

-Dhigh-availability.type=kubernetes \ -Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \ -Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \ -Dkubernetes.jobmanager.replicas=3 \ -Dkubernetes.cluster-id=example-cluster
  • Dhigh-availability.storageDir : compartiment Amazon S3 où vous souhaitez stocker les métadonnées de haute disponibilité pour votre tâche

    Dkubernetes.jobmanager.replicas : nombre de pods Job Manager à créer sous la forme d’un entier supérieur à 1

    Dkubernetes.cluster-id : ID unique qui identifie le cluster Flink