Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Ottimizzazione dei tempi di riavvio dei processi Flink per le operazioni di ripristino delle attività e dimensionamento con Amazon EMR su EKS
Quando un'attività non riesce o quando si verifica un'operazione di dimensionamento, Flink tenta di rieseguire l'attività dall'ultimo checkpoint completato. L'esecuzione del processo di riavvio potrebbe richiedere un minuto o più, a seconda delle dimensioni dello stato del checkpoint e del numero di attività parallele. Durante il periodo di riavvio, le attività di backlog relative al processo possono accumularsi. Esistono tuttavia alcuni modi in cui Flink ottimizza la velocità di ripristino e riavvio dei grafici di esecuzione per migliorare la stabilità del processo.
Questa pagina descrive alcuni dei modi in cui Amazon EMR Flink può migliorare il tempo di riavvio del lavoro durante il ripristino delle attività o la scalabilità delle operazioni su istanze locali. Le istanze Spot sono capacità di elaborazione inutilizzata disponibile a un prezzo scontato. Ha comportamenti unici, tra cui interruzioni occasionali, quindi è importante capire come Amazon EMR su EKS li gestisce, incluso il modo in cui Amazon EMR on EKS esegue lo smantellamento e il riavvio dei lavori.
Argomenti
Ripristino locale delle attività
Nota
Il ripristino locale delle attività è supportato con Flink su Amazon EMR su EKS 6.14.0 e versioni successive.
Con i checkpoint Flink, ogni attività produce un'istantanea del suo stato che Flink scrive in uno storage distribuito come Amazon S3. In caso di ripristino, le attività recuperano il loro stato dall'archiviazione distribuita. L'archiviazione distribuita offre tolleranza ai guasti e può ridistribuire lo stato durante il dimensionamento perché è accessibile a tutti i nodi.
Tuttavia, un archivio distribuito remoto presenta anche uno svantaggio: tutte le attività devono leggere il proprio stato da una posizione remota della rete. Ciò può comportare lunghi tempi di ripristino per stati di grandi dimensioni durante le operazioni di ripristino delle attività o di dimensionamento.
Il problema dei lunghi tempi di ripristino viene risolto mediante il ripristino locale delle attività. Le attività scrivono il loro stato su checkpoint in una memoria secondaria locale all'attività, ad esempio su un disco locale. Inoltre, memorizzano il loro stato nell'archiviazione principale, o su Amazon S3, come nel nostro caso. Durante il ripristino, lo scheduler pianifica le attività sullo stesso Task Manager in cui le attività erano state eseguite in precedenza, in modo che possano essere ripristinate dall'archivio di stato locale anziché essere lette dall'archivio di stato remoto. Per ulteriori informazioni, consulta l'argomento relativo al ripristino locale delle attività
I nostri test di benchmark con processi di esempio hanno dimostrato che il tempo di ripristino è stato ridotto da pochi minuti a pochi secondi con il ripristino locale delle attività abilitato.
Per abilitare il ripristino locale delle attività, imposta le seguenti configurazioni nel file flink-conf.yaml
. Specifica il valore dell'intervallo di checkpoint in millisecondi.
state.backend.local-recovery: true state.backend:
hasmap or rocksdb
state.checkpoints.dir: s3://STORAGE-BUCKET-PATH
/checkpoint execution.checkpointing.interval:15000
Ripristino locale delle attività tramite montaggio su volumi Amazon EBS
Nota
Il ripristino locale delle attività di Amazon EBS è supportato con Flink su Amazon EMR su EKS 6.15.0 e versioni successive.
Con Flink su Amazon EMR su EKS, puoi effettuare automaticamente il provisioning di volumi Amazon EBS a TaskManager pod per il ripristino locale delle attività. Il montaggio di un overlay predefinito include un volume di 10 GB, sufficiente per processi con uno stato inferiore. I lavori con stati di grandi dimensioni possono abilitare l'opzione montaggio automatico del volume EBS. Il TaskManager i pod vengono creati e montati automaticamente durante la creazione dei pod e rimossi durante l'eliminazione dei pod.
Utilizza i seguenti passaggi per abilitare il montaggio automatico dei volumi EBS per Flink in Amazon EMR su EKS:
-
Esporta i valori per le seguenti variabili che utilizzerai nei passaggi successivi.
export AWS_REGION=
aa-example-1
export FLINK_EKS_CLUSTER_NAME=my-cluster
export AWS_ACCOUNT_ID=111122223333
-
Crea o aggiorna un file YAML
kubeconfig
per il cluster.aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
-
Crea un account di servizio IAM per il driver CSI (Container Storage Interface) di Amazon EBS sul cluster Amazon EKS.
eksctl create iamserviceaccount \ --name ebs-csi-controller-sa \ --namespace kube-system \ --region $AWS_REGION \ --cluster $FLINK_EKS_CLUSTER_NAME\ --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \ --role-only \ --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \ --approve
-
Crea il driver CSI di Amazon EBS con il seguente comando:
eksctl create addon \ --name aws-ebs-csi-driver \ --region $AWS_REGION \ --cluster $FLINK_EKS_CLUSTER_NAME \ --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
-
Crea la classe di storage Amazon EBS con il seguente comando:
cat ≪ EOF ≫ storage-class.yaml apiVersion: storage.k8s.io/v1 kind: StorageClass metadata: name: ebs-sc provisioner: ebs.csi.aws.com volumeBindingMode: WaitForFirstConsumer EOF
Quindi applica la classe:
kubectl apply -f storage-class.yaml
-
Helm installa l'operatore Kubernetes di Amazon EMR Flink con opzioni per creare un account di servizio. Questa operazione crea il
emr-containers-sa-flink
da utilizzare nell'implementazione Flink.helm install flink-kubernetes-operator flink-kubernetes-operator/ \ --set jobServiceAccount.create=true \ --set rbac.jobRole.create=true \ --set rbac.jobRoleBinding.create=true
-
Per inviare il processo Flink e abilitare la fornitura automatica di volumi EBS per il ripristino locale delle attività, imposta le seguenti configurazioni nel file
flink-conf.yaml
. Regola il limite di dimensione in base alla dimensione dello stato del processo. ImpostaserviceAccount
suemr-containers-sa-flink
. Specifica il valore dell'intervallo di checkpoint in millisecondi. Quindi, omettiexecutionRoleArn
.flinkConfiguration: task.local-recovery.ebs.enable: true kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi state.checkpoints.dir: s3://
BUCKET-PATH
/checkpoint state.backend.local-recovery: true state.backend:hasmap or rocksdb
state.backend.incremental: "true" execution.checkpointing.interval:15000
serviceAccount: emr-containers-sa-flink
Quando sei pronto per eliminare il plug-in del driver CSI di Amazon EBS, usa i seguenti comandi:
# Detach Attached Policy aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy # Delete the created Role aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} # Delete the created service account eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION # Delete Addon eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION # Delete the EBS storage class kubectl delete -f storage-class.yaml
Checkpoint incrementale generico basato su log
Nota
Il checkpoint incrementale generico basato su log è supportato con Flink su Amazon EMR su EKS 6.14.0 e versioni successive.
Il checkpoint incrementale generico basato su log è stato aggiunto in Flink 1.16 per migliorare la velocità dei checkpoint. Un intervallo di checkpoint più rapido spesso comporta una riduzione del lavoro di ripristino perché è necessario rielaborare un minor numero di eventi dopo il ripristino. Per ulteriori informazioni, consulta Improving speed and stability of checkpointing with generic log-based incremental checkpoints
Con processi di esempio, i nostri test di benchmark hanno dimostrato che il tempo di checkpoint si è ridotto da pochi minuti a pochi secondi con il checkpoint incrementale generico basato su log.
Per abilitare i checkpoint incrementali generici basati su log, imposta le seguenti configurazioni nel tuo file flink-conf.yaml
. Specifica il valore dell'intervallo di checkpoint in millisecondi.
state.backend.changelog.enabled: true state.backend.changelog.storage: filesystem dstl.dfs.base-path: s3://
bucket-path
/changelog state.backend.local-recovery: true state.backend: rocksdb state.checkpoints.dir: s3://bucket-path
/checkpoint execution.checkpointing.interval:15000
Ripristino granulare
Nota
Il supporto per il ripristino granulare nel pianificatore predefinito è supportato con Flink su Amazon EMR su EKS 6.14.0 e versioni successive. Il supporto per il ripristino granulare nel pianificatore adattivo è disponibile con Flink su Amazon EMR su EKS 6.15.0 e versioni successive.
Quando un'attività riporta un errore durante l'esecuzione, Flink reimposta l'intero grafico di esecuzione e attiva una riesecuzione completa dall'ultimo checkpoint completato. Questa procedura è più costosa della semplice riesecuzione delle attività non riuscite. Il ripristino granulare riavvia solo il componente connesso alla pipeline dell'attività non riuscita. Nell'esempio seguente, il grafico del processo ha 5 vertici (da A
a E
). Tutte le connessioni tra i vertici avvengono tramite pipeline con distribuzione uniforme e il comando parallelism.default
per il processo è impostato su 2
.
A → B → C → D → E
Per questo esempio, le attività totali in esecuzione sono 10. La prima pipeline (a1
toe1
) viene eseguita su un TaskManager (TM1
) e la seconda pipeline (a2
toe2
) viene eseguita su un'altra TaskManager (TM2
).
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
Esistono due componenti collegati tramite pipeline: a1 → e1
e a2 →
e2
. Se una delle due TM1
attività TM2
fallisce, l'errore ha effetto solo sulle 5 attività della pipeline in cui TaskManager era in esecuzione. La strategia di riavvio avvia solo il componente della pipeline interessato.
Il ripristino granulare funziona solo con processi Flink perfettamente paralleli. Non è supportato con le operazioni keyBy()
o redistribute()
. Per ulteriori informazioni, consulta FLIP-1: Fine Grained Recovery from Task Failures
Per abilitare il ripristino granulare, imposta le seguenti configurazioni nel file flink-conf.yaml
.
jobmanager.execution.failover-strategy: region restart-strategy:
exponential-delay or fixed-delay
Meccanismo di riavvio combinato nel pianificatore adattivo
Nota
Il meccanismo di riavvio combinato nel pianificatore adattivo è supportato con Flink su Amazon EMR su EKS 6.15.0 e versioni successive.
Il pianificatore adattivo può regolare il parallelismo del processo in base agli slot disponibili. Riduce automaticamente il parallelismo se non sono disponibili abbastanza slot per soddisfare il parallelismo configurato del processo. Se diventano disponibili nuovi slot, il processo viene nuovamente dimensionato in base al parallelismo configurato del processo. Un pianificatore adattivo evita i tempi di inattività del processo quando le risorse disponibili non sono sufficienti. Questo è il pianificatore supportato per Autoscaler di Flink. Per questi motivi, con Flink di Amazon EMR consigliamo il pianificatore adattivo. Tuttavia, i pianificatori adattivi potrebbero eseguire più riavvii in un breve periodo di tempo, un riavvio per ogni nuova risorsa aggiunta. Questo potrebbe comportare un calo delle prestazioni nel processo.
Con Amazon EMR 6.15.0 e versioni successive, Flink dispone di un meccanismo di riavvio combinato nel pianificatore adattivo che apre una finestra di riavvio quando viene aggiunta la prima risorsa e quindi attende fino all'intervallo di finestra configurato di 1 minuto predefinito. Esegue un singolo riavvio quando sono disponibili risorse sufficienti per eseguire il processo con il parallelismo configurato o quando scade l'intervallo.
Con processi di esempio, i nostri test di benchmark hanno dimostrato che questa funzionalità elabora il 10% dei record in più rispetto al comportamento predefinito quando si utilizzano pianificatori adattivi e autoscaler di Flink.
Per abilitare il meccanismo di riavvio combinato, imposta le seguenti configurazioni nel file flink-conf.yaml
.
jobmanager.adaptive-scheduler.combined-restart.enabled: true jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m