Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Uso de la alta disponibilidad (HA) para Flink Operators y Flink Applications
En este tema se muestra cómo configurar la alta disponibilidad y se describe cómo funciona en algunos casos de uso diferentes. Aquí se incluye cuando usa el administrador de trabajos y cuando usa Kubernetes nativo de Flink.
Alta disponibilidad del Flink Operator
Habilitamos la alta disponiblidad para el Flink Operator para poder realizar un cambio a un Flink Operator en espera y minimizar el tiempo de inactividad en el bucle de control del operador en caso de fallos. La alta disponibilidad está habilitada de forma predeterminada y el número predeterminado de réplicas de operadores de inicio es 2. Puede configurar el campo de réplicas en su archivo values.yaml
para el gráfico de Helm.
Los siguientes campos se pueden personalizar:
-
replicas
(opcional, el valor predeterminado es 2): si se establece este número en uno mayor que 1, se crean otros operadores en espera y se puede recuperar el trabajo con mayor rapidez. -
highAvailabilityEnabled
(opcional, el valor predeterminado es verdadero): controla si desea habilitar la alta disponibilidad. Si se especifica este parámetro como verdadero, se habilita la compatibilidad con la implementación multi-AZ y se establecen los parámetrosflink-conf.yaml
correctos.
Puede deshabilitar la alta disponibilidad para su operador al establecer la siguiente configuración en su archivo values.yaml
.
... imagePullSecrets: [] replicas: 1 # set this to false if you don't want HA highAvailabilityEnabled: false ...
Implementación Multi-AZ
Creamos los pods de los operadores en varias zonas de disponibilidad. Se trata de una limitación leve y, si no dispone de recursos suficientes en una zona de disponibilidad diferente, los pods de los operadores se programarán en la misma zona de disponibilidad.
Determinar la réplica líder
Si la alta disponibilidad está habilitada, las réplicas utilizan un arrendamiento para determinar cuál de los JM es el líder y utilizan un arrendamiento de los K8 para la elección del líder. Puede describir el arrendamiento y consultar el campo .Spec.Holder Identity para determinar el líder actual
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
Interacción entre Flink y S3
Configuración de las credenciales de acceso
Asegúrese de haber configurado las IRSA con los permisos de IAM adecuados para acceder al bucket de S3.
Búsqueda de archivos jar de trabajo desde el modo aplicación de S3
El operador de Flink también admite la búsqueda de archivos jar de aplicaciones desde S3. Solo tiene que proporcionar la ubicación de S3 del jarURI en su especificación de FlinkDeployment.
También puede usar esta característica para descargar otros artefactos como los scripts de PyFlink. El script de Python resultante se coloca en la ruta /opt/flink/usrlib/
.
En el siguiente ejemplo, se muestra cómo utilizar esta característica para un trabajo de PyFlink. Tenga en cuenta los campos jarURI y args.
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: python-example spec: image: <YOUR CUSTOM PYFLINK IMAGE> emrReleaseLabel: "emr-6.12.0-flink-latest" flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" serviceAccount: flink jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"] parallelism: 1 upgradeMode: stateless
Conectores de Flink de S3
Flink viene empaquetado con dos conectores de S3 (enumerados a continuación). En las siguientes secciones se explica cuándo usar cada conector.
Punto de control: conector de Presto de S3
-
Establezca el esquema de S3 en s3p://
-
El conector recomendado para establecer el punto de control en s3. Para obtener más información, consulte Específico de S3
en la documentación de Apache Flink.
Especificación de FlinkDeployment de ejemplo:
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
Lectura y escritura en S3: conector Hadoop S3
-
Establezca el esquema de S3 en
s3://
o en (s3a://
) -
El conector recomendado para leer y escribir archivos desde S3 (solo el conector de S3 que implementa la interfaz del sistema de archivos de Flink
). -
De forma predeterminada, establecemos
fs.s3a.aws.credentials.provider
en el archivoflink-conf.yaml
, que escom.amazonaws.auth.WebIdentityTokenCredentialsProvider
. Si anula completamente el valor predeterminadoflink-conf
y está interactuando con S3, asegúrese de usar este proveedor.
Especificación de FlinkDeployment de ejemplo
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: job: jarURI: local:///opt/flink/examples/streaming/WordCount.jar args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ] parallelism: 2 upgradeMode: stateless
Flink JobManager
La alta disponibilidad (HA) para las implementaciones de Flink permite que los trabajos sigan progresando incluso si se produce un error transitorio y JobManager se bloquea. Los trabajos se reiniciarán, pero desde el último punto de control exitoso con la alta disponibilidad activada. Si la alta disponibilidad no está activada, Kubernetes reiniciará su JobManager, pero su trabajo empezará como un trabajo nuevo y perderá todo progreso. Después de configurar la alta disponibilidad, podemos indicar a Kubernetes que almacene los metadatos de la HA en un almacenamiento persistente para consultarlos en caso de que se produzca un error transitorio en el JobManager y, a continuación, reanudar nuestros trabajos desde el último punto de control exitoso.
La alta disponibilidad está habilitada de forma predeterminada para sus trabajos de Flink (el recuento de réplicas está establecido en 2, por lo que tendrá que proporcionar una ubicación de almacenamiento en S3 para que los metadatos de alta disponibilidad se conserven).
Configuraciones de alta disponibilidad
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" executionRoleArn: "<JOB EXECUTION ROLE ARN>" emrReleaseLabel: "emr-6.13.0-flink-latest" jobManager: resource: memory: "2048m" cpu: 1 replicas: 2 highAvailabilityEnabled: true storageDir: "s3://<S3 PERSISTENT STORAGE DIR>" taskManager: resource: memory: "2048m" cpu: 1
Las siguientes son descripciones de las configuraciones de alta disponibilidad anteriores en Job Manager (definidas en .spec.JobManager):
-
highAvailabilityEnabled
(opcional, el valor predeterminado es verdadero): configúrela enfalse
si no quiere activar la alta disponibilidad y no quiere utilizar las configuraciones de alta disponibilidad proporcionadas. Aún puede manipular el campo “réplicas” para configurar de forma manual la alta disponibilidad. -
replicas
(opcional, el valor predeterminado es 2): si se establece este número en uno mayor que 1, se crean otros JobManagers en espera y permite una recuperación más rápida del trabajo. Si deshabilita la alta disponibilidad, debe establecer el recuento de réplicas en 1 o seguirá recibiendo errores de validación (solo se admite 1 réplica si la alta disponibilidad no está habilitada). -
storageDir
(obligatorio): dado que utilizamos el recuento de réplicas como 2 de forma predeterminada, debemos proporcionar un storageDir persistente. Actualmente, este campo solo acepta rutas de S3 como ubicación de almacenamiento.
Localidad del pod
Si habilita la alta disponibilidad, también intentamos colocar los pods en la misma zona de disponibilidad, lo que mejora el rendimiento (al tener los pods en las mismas zonas de disponibilidad, se reduce la latencia de la red). Se trata de un proceso de mejor esfuerzo, ya que, si no dispone de recursos suficientes en la zona de disponibilidad en la que están programados la mayoría de sus pods, los demás pods seguirán estando programados, pero es posible que acaben en un nodo fuera de esta zona de disponibilidad.
Determinar la réplica líder
Si la alta disponibilidad está habilitada, las réplicas usan una concesión para determinar cuál de los JM es el líder y usan un ConfigMap de K8 como almacén de datos para almacenar estos metadatos. Si quiere determinar el líder, consulte el contenido del Configmap y busque en los datos la clave org.apache.flink.k8s.leader.restserver
para encontrar los pod de K8 con la dirección IP. También puede utilizar los siguientes comandos bash.
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}') kubectl get pods -n
NAMESPACE
-o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
Trabajo de Flink: Kubernetes nativo
A partir de la versión 6.13.0, Amazon EMR es compatible con Kubernetes nativo de Flink para ejecutar aplicaciones de Flink en un clúster de Amazon EKS.
nota
Debe tener un bucket de Amazon S3 creado para almacenar los metadatos de alta disponibilidad cuando envíe su trabajo de Flink. Si no desea usar esta característica, puede desactivarla. Está habilitada de forma predeterminada.
Para activar la característica de alta disponibilidad de Flink, utilice los siguientes parámetros de Flink al ejecutar el comando de la CLI run-application. Los parámetros se definen debajo del ejemplo.
-Dhigh-availability.type=kubernetes \ -Dhigh-availability.storageDir=
S3://DOC-EXAMPLE-STORAGE-BUCKET
\ -Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \ -Dkubernetes.jobmanager.replicas=3 \ -Dkubernetes.cluster-id=example-cluster
-
Dhigh-availability.storageDir
: el bucket de Amazon S3 en el que desea almacenar los metadatos de alta disponibilidad para su trabajo.Dkubernetes.jobmanager.replicas
: el número de pods de Job Manager que se van a crear como un entero superior a1
.Dkubernetes.cluster-id
: un identificador único que identifica el clúster de Flink.