Optimización de los tiempos de reinicio de las tareas de Flink para las operaciones de escalado y de recuperación de tareas con Amazon EMR en EKS - Amazon EMR

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Optimización de los tiempos de reinicio de las tareas de Flink para las operaciones de escalado y de recuperación de tareas con Amazon EMR en EKS

Cuando se produce un error en una tarea o se produce una operación de escalado, Flink intenta volver a ejecutar la tarea desde el último punto de control completado. El proceso de reinicio puede tardar un minuto o más en ejecutarse, según el tamaño del estado del punto de control y el número de tareas paralelas. Durante el periodo de reinicio, es posible que se acumulen tareas pendientes para el trabajo. Sin embargo, hay algunas formas en las que Flink optimiza la velocidad de los gráficos de recuperación y reinicio de la ejecución para mejorar la estabilidad del trabajo.

En esta página, se describen algunas de las formas en que Amazon EMR Flink puede mejorar el tiempo de reinicio de un trabajo durante las operaciones de recuperación o escalado en instancias Spot. Las instancias Spot representan capacidad informática no utilizada que está disponible con descuento. Tiene comportamientos únicos, incluidas las interrupciones ocasionales, por lo que es importante entender cómo Amazon EMR en EKS los gestiona, incluida la forma en que Amazon EMR en EKS lleva a cabo el desmantelamiento y el reinicio de los trabajos.

nota

La recuperación local de tareas es compatible a partir de la versión 6.14.0 de Flink en Amazon EMR en EKS.

Con los puntos de control de Flink, cada tarea produce una instantánea de su estado que Flink graba en un almacenamiento distribuido como Amazon S3. En los casos de recuperación, las tareas restauran su estado desde el almacenamiento distribuido. El almacenamiento distribuido ofrece tolerancia a errores y puede redistribuir el estado durante el reescalado, ya que todos los nodos pueden acceder a él.

Sin embargo, un almacén distribuido remoto también tiene una desventaja: todas las tareas deben leer su estado desde una ubicación remota a través de la red. Esto puede provocar tiempos de recuperación prolongados para los estados de gran tamaño durante las operaciones de recuperación o escalado de tareas.

Este problema del tiempo de recuperación prolongado se resuelve mediante la recuperación local de tareas. Las tareas escriben su estado en el punto de control de un almacenamiento secundario que es local para la tarea, por ejemplo, en un disco local. También almacenan su estado en el almacenamiento principal, o Amazon S3 en nuestro caso. Durante la recuperación, el programador programa las tareas en el mismo administrador de tareas en el que se ejecutaron anteriormente, de modo que se puedan recuperar del almacén de estado local en lugar de leerlas del almacén de estado remoto. Para obtener más información, consulte Task-Local Recovery en la documentación de Apache Flink.

Nuestras pruebas comparativas con trabajos de muestra han demostrado que el tiempo de recuperación se ha reducido de unos minutos a unos pocos segundos con la opción de recuperación local de tareas habilitada.

Para habilitar la recuperación local de tareas, defina las siguientes configuraciones en el archivo flink-conf.yaml. Especifique el valor del intervalo de puntos de control en milisegundos.

state.backend.local-recovery: true state.backend: hasmap or rocksdb state.checkpoints.dir: s3://STORAGE-BUCKET-PATH/checkpoint execution.checkpointing.interval: 15000
nota

La recuperación local de tareas mediante Amazon EBS es compatible a partir de la versión 6.15.0 de Flink en Amazon EMR en EKS.

Con Flink en Amazon EMR en EKS, puede aprovisionar automáticamente volúmenes de Amazon EBS al TaskManager pods para la recuperación local de tareas. El soporte superpuesto predeterminado viene con un volumen de 10 GB, suficiente para trabajos con un estado inferior. Los trabajos con estados grandes pueden habilitar la opción de montaje de volumen automático con EBS. La TaskManager Los pods se crean y montan automáticamente durante la creación del pod y se eliminan al eliminarlos.

Realice los siguientes pasos a fin de habilitar el montaje automático del volumen de EBS para Flink en Amazon EMR en EKS:

  1. Exporte los valores de las siguientes variables que utilizará en los próximos pasos.

    export AWS_REGION=aa-example-1 export FLINK_EKS_CLUSTER_NAME=my-cluster export AWS_ACCOUNT_ID=111122223333
  2. Cree o actualice un archivo de kubeconfig de YAML para el clúster.

    aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  3. Cree una cuenta de servicio de IAM para el controlador de Amazon EBS Container Storage Interface (CSI) en su clúster de Amazon EKS.

    eksctl create iamserviceaccount \ --name ebs-csi-controller-sa \ --namespace kube-system \ --region $AWS_REGION \ --cluster $FLINK_EKS_CLUSTER_NAME\ --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \ --role-only \ --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \ --approve
  4. Cree un controlador del Amazon EBS CSI con el siguiente comando:

    eksctl create addon \ --name aws-ebs-csi-driver \ --region $AWS_REGION \ --cluster $FLINK_EKS_CLUSTER_NAME \ --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
  5. Cree la clase de almacenamiento de Amazon EBS con el siguiente comando:

    cat ≪ EOF ≫ storage-class.yaml apiVersion: storage.k8s.io/v1 kind: StorageClass metadata: name: ebs-sc provisioner: ebs.csi.aws.com volumeBindingMode: WaitForFirstConsumer EOF

    Y, a continuación, aplique la clase:

    kubectl apply -f storage-class.yaml
  6. Helm instaló el operador Amazon EMR Flink Kubernetes con opciones para crear una cuenta de servicio. Esto crea el emr-containers-sa-flink que se usará en la implementación de Flink.

    helm install flink-kubernetes-operator flink-kubernetes-operator/ \ --set jobServiceAccount.create=true \ --set rbac.jobRole.create=true \ --set rbac.jobRoleBinding.create=true
  7. Para enviar el trabajo de Flink y habilitar el aprovisionamiento automático de volúmenes de EBS para la recuperación local de tareas, defina las siguientes configuraciones en el archivo flink-conf.yaml. Ajuste el límite de tamaño para el tamaño del estado del trabajo. Establece serviceAccount en emr-containers-sa-flink. Especifique el valor del intervalo de puntos de control en milisegundos. Y omita el executionRoleArn.

    flinkConfiguration: task.local-recovery.ebs.enable: true kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi state.checkpoints.dir: s3://BUCKET-PATH/checkpoint state.backend.local-recovery: true state.backend: hasmap or rocksdb state.backend.incremental: "true" execution.checkpointing.interval: 15000 serviceAccount: emr-containers-sa-flink

Cuando esté listo para eliminar el complemento del controlador CSI de Amazon EBS, utilice los siguientes comandos:

# Detach Attached Policy aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy # Delete the created Role aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} # Delete the created service account eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION # Delete Addon eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION # Delete the EBS storage class kubectl delete -f storage-class.yaml
nota

Los puntos de control incrementales genéricos basados en registros son compatibles a partir de la versión 6.14.0 de Flink en Amazon EMR en EKS.

En Flink 1.16, se agregaron puntos de control incrementales genéricos basados en registros para mejorar la velocidad de los puntos de control. Un intervalo de puntos de control más rápido suele reducir el trabajo de recuperación, ya que es necesario volver a procesar menos eventos después de la recuperación. Para obtener más información, consulte Improving speed and stability of checkpointing with generic log-based incremental checkpoints en el blog de Apache Flink.

Con trabajos de muestra, nuestras pruebas comparativas han demostrado que el tiempo de los puntos de control se ha reducido de unos minutos a unos pocos segundos con el punto de control incremental genérico basado en registros.

Para habilitar los puntos de control incrementales genéricos basados en registros, defina las siguientes configuraciones en el archivo flink-conf.yaml. Especifique el valor del intervalo de puntos de control en milisegundos.

state.backend.changelog.enabled: true state.backend.changelog.storage: filesystem dstl.dfs.base-path: s3://bucket-path/changelog state.backend.local-recovery: true state.backend: rocksdb state.checkpoints.dir: s3://bucket-path/checkpoint execution.checkpointing.interval: 15000
nota

El soporte de la recuperación detallada para el programador predeterminado es compatible a partir de la versión 6.14.0 de Flink en Amazon EMR en EKS. El soporte de la recuperación detallada para el programador predeterminado está disponible a partir de la versión 6.15.0 de Flink en Amazon EMR en EKS.

Cuando se produce un error en una tarea durante la ejecución, Flink restablece todo el gráfico de ejecución y activa una nueva ejecución completa desde el último punto de control completado. Esto es más caro que volver a ejecutar las tareas con errores. La recuperación detallada reinicia solo el componente conectado a la canalización de la tarea con el error. En el siguiente ejemplo, el gráfico de tareas tiene 5 vértices (de A a E). Todas las conexiones entre los vértices se canalizan con una distribución puntual y el valor parallelism.default para el trabajo se establece en 2.

A → B → C → D → E

En este ejemplo, hay un total de 10 tareas en ejecución. La primera canalización (a1toe1) se ejecuta en un TaskManager (TM1), y la segunda canalización (a2haciae2) corre por otra TaskManager (TM2).

a1 → b1 → c1 → d1 → e1 a2 → b2 → c2 → d2 → e2

Hay dos componentes conectados por canalización: a1 → e1 y a2 → e2. Si TM1 alguna de las dos TM2 falla, la falla solo afecta a las 5 tareas en proceso, donde TaskManager estaba ejecutándose. La estrategia de reinicio solo inicia el componente canalizado afectado.

La recuperación detallada solo funciona con trabajos de Flink perfectamente paralelos. No es compatible con las operaciones keyBy() o redistribute(). Para obtener más información, consulte FLIP-1: Fine Grained Recovery from Task Failures en el proyecto de Jira Flink Improvement Proposal.

Para habilitar la recuperación detallada, establezca las siguientes configuraciones en el archivo flink-conf.yaml.

jobmanager.execution.failover-strategy: region restart-strategy: exponential-delay or fixed-delay
nota

El mecanismo de reinicio combinado del programador adaptativo es compatible a partir de la versión 6.15.0 de Flink en Amazon EMR en EKS.

El programador adaptativo puede ajustar el paralelismo del trabajo en función de los espacios disponibles. Reduce automáticamente el paralelismo si no hay suficientes espacios disponibles para ajustarse al paralelismo del trabajo configurado. Si hay nuevos espacios disponibles, la tarea se escala verticalmente de nuevo al paralelismo del trabajo configurado. Un programador adaptativo evita el tiempo de inactividad del trabajo cuando no hay suficientes recursos disponibles. Este es el programador compatible con el escalador automático de Flink. Recomendamos el programador adaptativo con Amazon EMR Flink por estos motivos. Sin embargo, los programadores adaptativos pueden realizar varios reinicios en un periodo corto, uno por cada nuevo recurso agregado. Esto puede provocar una disminución en el rendimiento del trabajo.

Con Amazon EMR 6.15.0 y versiones posteriores, Flink cuenta con un mecanismo de reinicio combinado en el programador adaptativo que abre una ventana de reinicio cuando se agrega el primer recurso y, a continuación, espera hasta alcanzar el intervalo de ventana configurado del minuto predeterminado. Realiza un único reinicio cuando hay suficientes recursos disponibles para ejecutar el trabajo con el paralelismo configurado o cuando se agota el intervalo.

Con trabajos de muestra, nuestras pruebas comparativas han demostrado que esta característica procesa un 10 % de los registros más que el comportamiento predeterminado cuando se utiliza el programador adaptativo y el escalador automático de Flink.

Para habilitar el mecanismo de reinicio combinado, defina las siguientes configuraciones en el archivo flink-conf.yaml.

jobmanager.adaptive-scheduler.combined-restart.enabled: true jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m