Otimização dos tempos de reinicialização de trabalhos do Flink para operações de recuperação e ajuste de escala de tarefas com o Amazon EMR no EKS
Quando uma tarefa falha ou ocorre uma operação de ajuste de escala, o Flink tenta reexecutar a tarefa com base no último ponto de verificação concluído. O processo de reinicialização pode levar um minuto ou mais para ser executado, dependendo do tamanho do estado do ponto de verificação e do número de tarefas paralelas. Durante o período de reinicialização, as tarefas de backlog podem se acumular para o trabalho. No entanto, existem algumas maneiras de o Flink otimizar a velocidade de recuperação e reinicialização dos gráficos de execução para melhorar a estabilidade do trabalho.
Esta página descreve algumas maneiras que o Flink no Amazon EMR pode usar para melhorar o tempo de reinício de trabalhos durante as operações de recuperação ou ajuste de escala de tarefas em instâncias spot. As instâncias spot são capacidades computacionais não utilizadas que estão disponíveis com desconto. Elas têm comportamentos únicos, incluindo interrupções ocasionais, por isso é importante entender como o Amazon EMR no EKS lida com elas e como o Amazon EMR no EKS realiza a desativação e a reinicialização do trabalho.
Tópicos
Recuperação local de tarefas
nota
A recuperação local de tarefas é compatível com o Flink nas versões 6.14.0 e superiores do Amazon EMR no EKS.
Com os pontos de verificação do Flink, cada tarefa produz um snapshot do próprio estado que o Flink grava em um armazenamento distribuído, como o Amazon S3. Em casos de recuperação, as tarefas restauram o estado no armazenamento distribuído. O armazenamento distribuído oferece tolerância a falhas e pode redistribuir o estado durante o reajuste de escala por ser acessível a todos os nós.
No entanto, um armazenamento distribuído remoto também tem uma desvantagem: todas as tarefas devem ler seu estado de um local remoto na rede. Isso pode resultar em tempos de recuperação longos para estados grandes durante operações de recuperação ou ajuste de escala de tarefas.
Esse problema de tempo de recuperação longo é resolvido pela recuperação local de tarefas. As tarefas gravam seu estado no ponto de verificação em um armazenamento secundário local para a tarefa, como em um disco local. Elas também armazenam seu estado no armazenamento principal, ou no Amazon S3, no nosso caso. Durante a recuperação, o agendador programa as tarefas no mesmo gerenciador de tarefas em que elas foram executadas anteriormente, para que possam se recuperar do armazenamento de estado local em vez de ler do armazenamento de estado remoto. Para obter mais informações, consulte Task-Local Recovery
Nossos testes de benchmark com exemplos de trabalhos mostraram que o tempo de recuperação foi reduzido de minutos para alguns segundos com a recuperação local da tarefa ativada.
Para habilitar a recuperação local de tarefas, defina as configurações a seguir no seu arquivo flink-conf.yaml
. Especifique o valor do intervalo do ponto de verificação em milissegundos.
state.backend.local-recovery: true state.backend:
hasmap or rocksdb
state.checkpoints.dir: s3://STORAGE-BUCKET-PATH
/checkpoint execution.checkpointing.interval:15000
Recuperação local de tarefas por meio da montagem de volume do Amazon EBS
nota
A recuperação local de tarefas por meio do Amazon EBS é compatível com o Flink nas versões 6.15.0 e superiores do Amazon EMR no EKS.
Com o Flink para Amazon EMR no EKS, você pode provisionar automaticamente os volumes do Amazon EBS nos pods do TaskManager para a recuperação local de tarefas. A montagem de sobreposição padrão vem com um volume de 10 GB, o que é suficiente para trabalhos com um estado inferior. Trabalhos com estados grandes podem habilitar a opção de montagem automática de volume do EBS. Os pods do TaskManager são criados e montados automaticamente durante a criação do pod e removidos durante a exclusão do pod.
Use as etapas a seguir para habilitar a montagem automática de volume do EBS para o Flink no Amazon EMR no EKS.
-
Exporte os valores das seguintes variáveis que você usará nas próximas etapas:
export AWS_REGION=
aa-example-1
export FLINK_EKS_CLUSTER_NAME=my-cluster
export AWS_ACCOUNT_ID=111122223333
-
Crie ou atualize um arquivo
kubeconfig
YAML para o cluster.aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
-
Crie uma conta de serviço do IAM para o driver de interface de armazenamento de contêiner (CSI) do Amazon EBS no cluster do Amazon EKS.
eksctl create iamserviceaccount \ --name ebs-csi-controller-sa \ --namespace kube-system \ --region $AWS_REGION \ --cluster $FLINK_EKS_CLUSTER_NAME\ --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \ --role-only \ --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \ --approve
-
Crie o driver CSI do Amazon EBS com o seguinte comando:
eksctl create addon \ --name aws-ebs-csi-driver \ --region $AWS_REGION \ --cluster $FLINK_EKS_CLUSTER_NAME \ --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
-
Crie a classe de armazenamento do Amazon EBS com o seguinte comando:
cat ≪ EOF ≫ storage-class.yaml apiVersion: storage.k8s.io/v1 kind: StorageClass metadata: name: ebs-sc provisioner: ebs.csi.aws.com volumeBindingMode: WaitForFirstConsumer EOF
Depois, aplique a classe:
kubectl apply -f storage-class.yaml
-
O Helm instala o operador do Kubernetes para Flink do Amazon EMR com opções para criar uma conta de serviço. Isso cria o
emr-containers-sa-flink
para ser usado na implantação do Flink.helm install flink-kubernetes-operator flink-kubernetes-operator/ \ --set jobServiceAccount.create=true \ --set rbac.jobRole.create=true \ --set rbac.jobRoleBinding.create=true
-
Para enviar o trabalho do Flink e habilitar o provisionamento automático de volumes do EBS para a recuperação local de tarefas, defina as configurações a seguir no seu arquivo
flink-conf.yaml
. Ajuste o limite de tamanho para o tamanho do estado do trabalho. DefinaserviceAccount
comoemr-containers-sa-flink
. Especifique o valor do intervalo do ponto de verificação em milissegundos. Omita oexecutionRoleArn
.flinkConfiguration: task.local-recovery.ebs.enable: true kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi state.checkpoints.dir: s3://
BUCKET-PATH
/checkpoint state.backend.local-recovery: true state.backend:hasmap or rocksdb
state.backend.incremental: "true" execution.checkpointing.interval:15000
serviceAccount: emr-containers-sa-flink
Quando estiver tudo pronto para excluir o plug-in do driver CSI do Amazon EBS, use os seguintes comandos:
# Detach Attached Policy aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy # Delete the created Role aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} # Delete the created service account eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION # Delete Addon eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION # Delete the EBS storage class kubectl delete -f storage-class.yaml
Ponto de verificação incremental genérico baseado em logs
nota
O ponto de verificação incremental genérico baseado em logs é compatível com o Flink para as versões 6.14.0 e superiores do Amazon EMR no EKS.
O recurso de ponto de verificação incremental genérico baseado em logs foi adicionado ao Flink 1.16 para melhorar a velocidade dos pontos de verificação. Um intervalo de ponto de verificação mais rápido costuma resultar em uma redução do trabalho de recuperação porque menos eventos precisam ser reprocessados após a recuperação. Para obter mais informações, consulte Improving speed and stability of checkpointing with generic log-based incremental checkpoints
Com exemplos de trabalhos, nossos testes de comparação mostraram que o tempo do ponto de verificação foi reduzido de minutos para alguns segundos com o ponto de verificação incremental genérico baseado em logs.
Para habilitar os pontos de verificação incrementais genéricos baseados em logs, defina as configurações a seguir no seu arquivo flink-conf.yaml
. Especifique o valor do intervalo do ponto de verificação em milissegundos.
state.backend.changelog.enabled: true state.backend.changelog.storage: filesystem dstl.dfs.base-path: s3://
bucket-path
/changelog state.backend.local-recovery: true state.backend: rocksdb state.checkpoints.dir: s3://bucket-path
/checkpoint execution.checkpointing.interval:15000
Recuperação refinada
nota
O suporte de recuperação refinada para o agendador padrão é compatível com o Flink nas versões 6.14.0 e superiores do Amazon EMR no EKS. O suporte de recuperação refinada no agendador adaptável está disponível com o Flink nas versões 6.15.0 e superiores do Amazon EMR no EKS.
Quando uma tarefa falha durante a execução, o Flink redefine todo o gráfico de execução e aciona a reexecução completa com base no último ponto de verificação concluído. Isso é mais caro do que apenas reexecutar as tarefas que falharam. A recuperação refinada reinicia somente o componente conectado ao pipeline da tarefa que falhou. No exemplo a seguir, o gráfico do trabalho tem cinco vértices (de A
a E
). É feito o pipeline de todas as conexões entre os vértices com distribuição pontual, e o parallelism.default
do trabalho é definido como 2
.
A → B → C → D → E
Neste exemplo, há um total de dez tarefas em execução. O primeiro pipeline (de a1
a e1
) é executado em um TaskManager (TM1
) e o segundo pipeline (de a2
a e2
) é executado em outro TaskManager (TM2
).
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
Há dois componentes conectados por pipeline: a1 → e1
e a2 →
e2
. Se TM1
ou TM2
falhar, a falha afetará somente as cinco tarefas no pipeline em que TaskManager estava em execução. A estratégia de reinicialização só inicia o componente do pipeline afetado.
A recuperação refinada funciona somente com trabalhos perfeitamente paralelos do Flink. Não é compatível com operações de keyBy()
ou redistribute()
. Para obter mais informações, consulte FLIP-1: Fine Grained Recovery from Task Failures
Para habilitar a recuperação refinada, defina as configurações a seguir no seu arquivo flink-conf.yaml
.
jobmanager.execution.failover-strategy: region restart-strategy:
exponential-delay or fixed-delay
Mecanismo de reinício combinado no agendador adaptável
nota
O mecanismo de reinicialização combinado no agendador adaptável é compatível com o Flink nas versões 6.15.0 e superiores do Amazon EMR no EKS.
O agendador adaptável pode ajustar o paralelismo do trabalho com base nos slots disponíveis. Ele reduz automaticamente o paralelismo se não houver slots suficientes disponíveis para atender ao paralelismo do trabalho configurado. Se novos slots ficarem disponíveis, o trabalho aumentará a escala verticalmente mais uma vez para o paralelismo do trabalho configurado. Um agendador adaptável evita o tempo de inatividade no trabalho quando não há recursos suficientes disponíveis. Esse é o agendador compatível com o escalador automático do Flink. Recomendamos o agendador adaptável com o Flink no Amazon EMR por esses motivos. No entanto, os agendadores adaptáveis podem fazer várias reinicializações em um curto período; uma reinicialização para cada novo recurso adicionado. Isso pode levar a uma queda na performance do trabalho.
Com o Amazon EMR 6.15.0 e versões superiores, o Flink tem um mecanismo de reinício combinado no agendador adaptável que abre uma janela de reinicialização quando o primeiro recurso é adicionado e aguarda pelo intervalo configurado da janela do padrão de um minuto. Ele executa uma única reinicialização quando há recursos suficientes disponíveis para executar o trabalho com o paralelismo configurado ou quando o intervalo expira.
Com os exemplos de trabalhos, nossos testes de comparação mostraram que esse recurso processa 10% a mais dos registros do que o comportamento padrão ao usar o agendador adaptável e o escalador automático do Flink.
Para habilitar o mecanismo de reinício combinado, defina as configurações a seguir no seu arquivo flink-conf.yaml
.
jobmanager.adaptive-scheduler.combined-restart.enabled: true jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m