Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Optimierung der Neustartzeiten von Flink-Jobs für Aufgabenwiederherstellungs- und Skalierungsvorgänge bei aktiviertem Amazon EMR EKS
Wenn eine Aufgabe fehlschlägt oder wenn ein Skalierungsvorgang stattfindet, versucht Flink, die Aufgabe vom letzten abgeschlossenen Prüfpunkt aus erneut auszuführen. Die Ausführung des Neustartvorgangs kann eine Minute oder länger dauern, abhängig von der Größe des Prüfpunktzustands und der Anzahl der parallelen Aufgaben. Während des Neustarts können sich Backlog-Aufgaben für den Auftrag ansammeln. Es gibt jedoch einige Möglichkeiten, wie Flink die Geschwindigkeit der Wiederherstellung und des Neustarts von Ausführungsdiagrammen optimiert, um die Auftragsstabilität zu verbessern.
Auf dieser Seite werden einige Möglichkeiten beschrieben, mit denen Amazon EMR Flink die Zeit für den Neustart von Jobs bei der Wiederherstellung von Aufgaben oder bei Skalierungsvorgängen auf Spot-Instances verbessern kann. Spot-Instances sind ungenutzte Rechenkapazität, die mit einem discount erhältlich ist. Es weist einzigartige Verhaltensweisen auf, einschließlich gelegentlicher Unterbrechungen. Daher ist es wichtig zu verstehen, wie Amazon EMR on mit diesen EKS umgeht, einschließlich der Art und Weise, wie Amazon EMR on die Außerbetriebnahme und Neustarts von Jobs EKS durchführt.
Themen
Aufgabenlokale Wiederherstellung
Anmerkung
Task-Local Recovery wird mit Flink on Amazon EMR auf Version EKS 6.14.0 und höher unterstützt.
Mit Flink-Prüfpunkten erstellt jede Aufgabe einen Snapshot ihres Status, den Flink in verteilte Speicher wie Amazon S3 schreibt. Im Falle einer Wiederherstellung stellen die Aufgaben ihren Status aus dem verteilten Speicher wieder her. Der verteilte Speicher bietet Fehlertoleranz und kann den Status während der Neuskalierung neu verteilen, da er für alle Knoten zugänglich ist.
Ein verteilter Remote-Speicher hat jedoch auch einen Nachteil: Alle Aufgaben müssen ihren Status von einem entfernten Standort aus über das Netzwerk lesen. Dies kann bei der Aufgabenwiederherstellung oder bei Skalierungsvorgängen zu langen Wiederherstellungszeiten für große Zustände führen.
Dieses Problem der langen Wiederherstellungszeit wird durch eine aufgabenlokale Wiederherstellung gelöst. Aufgaben schreiben ihren Status am Prüfüunkt in einen sekundären Speicher, der sich lokal zur Aufgabe befindet, z. B. auf eine lokale Festplatte. Sie speichern ihren Status auch im Primärspeicher oder in unserem Fall in Amazon S3. Während der Wiederherstellung plant der Scheduler die Aufgaben in demselben Task-Manager, in dem die Aufgaben zuvor ausgeführt wurden, sodass sie aus dem lokalen Statusspeicher wiederhergestellt werden können, anstatt sie aus dem Remote-Statusspeicher zu lesen. Weitere Informationen finden Sie unter Aufgabenlokale Wiederherstellung
Unsere Benchmark-Tests mit Beispielaufträgen haben gezeigt, dass die Wiederherstellungszeit bei aktivierter aufgabenlokaler Wiederherstellung von Minuten auf wenige Sekunden reduziert wurde.
Um die aufgabenlokale Wiederherstellung zu aktivieren, legen Sie die folgenden Konfigurationen in Ihrer flink-conf.yaml
-Datei fest. Geben Sie den Wert für das Prüfpunkt-Intervall in Millisekunden an.
state.backend.local-recovery: true state.backend:
hasmap or rocksdb
state.checkpoints.dir: s3://STORAGE-BUCKET-PATH
/checkpoint execution.checkpointing.interval:15000
Task-lokale Wiederherstellung durch Amazon Volume Mount EBS
Anmerkung
Task-Local Recovery von Amazon EBS wird mit Flink on Amazon EMR auf Version EKS 6.15.0 und höher unterstützt.
Wenn Flink EMR on Amazon aktiviert istEKS, können Sie automatisch EBS Amazon-Volumes für TaskManager Pods für die lokale Wiederherstellung von Aufgaben. Der Standard-Overlay-Mount verfügt über ein Volumen von 10 GB, was für Aufträge mit einem niedrigeren Status ausreichend ist. Bei Aufträgen mit großen Status kann die Option zum automatischen Einbinden von EBS Volumes aktiviert werden. Das Tool TaskManager Pods werden bei der Pod-Erstellung automatisch erstellt und bereitgestellt und beim Löschen des Pods entfernt.
Gehen Sie wie folgt vor, um die automatische EBS Volumenanpassung für Flink in Amazon EMR am EKS zu aktivieren:
-
Exportieren Sie die Werte für die folgenden Variablen, die Sie in den nächsten Schritten verwenden werden.
export AWS_REGION=
aa-example-1
export FLINK_EKS_CLUSTER_NAME=my-cluster
export AWS_ACCOUNT_ID=111122223333
-
Erstellen oder aktualisieren Sie eine
kubeconfig
YAML Datei für Ihren Cluster.aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
-
Erstellen Sie ein IAM Servicekonto für den Amazon EBS Container Storage Interface (CSI) -Treiber in Ihrem EKS Amazon-Cluster.
eksctl create iamserviceaccount \ --name ebs-csi-controller-sa \ --namespace kube-system \ --region $AWS_REGION \ --cluster $FLINK_EKS_CLUSTER_NAME\ --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \ --role-only \ --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \ --approve
-
Erstellen Sie den EBS CSI Amazon-Treiber mit dem folgenden Befehl:
eksctl create addon \ --name aws-ebs-csi-driver \ --region $AWS_REGION \ --cluster $FLINK_EKS_CLUSTER_NAME \ --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
-
Erstellen Sie die EBS Amazon-Speicherklasse mit dem folgenden Befehl:
cat ≪ EOF ≫ storage-class.yaml apiVersion: storage.k8s.io/v1 kind: StorageClass metadata: name: ebs-sc provisioner: ebs.csi.aws.com volumeBindingMode: WaitForFirstConsumer EOF
Und wenden Sie dann die Klasse an:
kubectl apply -f storage-class.yaml
-
Helm installiert den Amazon EMR Flink Kubernetes-Operator mit Optionen zum Erstellen eines Dienstkontos. Dadurch wird der
emr-containers-sa-flink
erstellt, der in der Flink-Bereitstellung verwendet werden soll.helm install flink-kubernetes-operator flink-kubernetes-operator/ \ --set jobServiceAccount.create=true \ --set rbac.jobRole.create=true \ --set rbac.jobRoleBinding.create=true
-
Um den Flink-Job einzureichen und die automatische Bereitstellung von EBS Volumes für die aufgabenlokale Wiederherstellung zu aktivieren, legen Sie die folgenden Konfigurationen in Ihrer Datei fest.
flink-conf.yaml
Passen Sie die Größenbeschränkung an die Statusgröße des Auftrags an. Setzen SieserviceAccount
aufemr-containers-sa-flink
. Geben Sie den Wert für das Prüfpunkt-Intervall in Millisekunden an. Und lassen Sie denexecutionRoleArn
weg.flinkConfiguration: task.local-recovery.ebs.enable: true kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi state.checkpoints.dir: s3://
BUCKET-PATH
/checkpoint state.backend.local-recovery: true state.backend:hasmap or rocksdb
state.backend.incremental: "true" execution.checkpointing.interval:15000
serviceAccount: emr-containers-sa-flink
Wenn Sie bereit sind, das EBS CSI Amazon-Treiber-Plugin zu löschen, verwenden Sie die folgenden Befehle:
# Detach Attached Policy aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy # Delete the created Role aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} # Delete the created service account eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION # Delete Addon eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION # Delete the EBS storage class kubectl delete -f storage-class.yaml
Generischer protokollbasierter inkrementeller Prüfpunkt
Anmerkung
Generisches logbasiertes inkrementelles Checkpointing wird mit Flink on Amazon EMR auf EKS Version 6.14.0 und höher unterstützt.
Generische protokollbasierte inkrementelle Prüfpunkte wurden in Flink 1.16 hinzugefügt, um die Geschwindigkeit von Prüfpunkten zu verbessern. Ein schnelleres Prüfpunktintervall führt häufig zu einer Reduzierung des Wiederherstellungsaufwands, da weniger Ereignisse nach der Wiederherstellung erneut verarbeitet werden müssen. Weitere Informationen finden Sie im Apache-Flink-Blog unter Verbesserung der Geschwindigkeit und Stabilität von Prüfpunkten mit generischen protokollbasierten inkrementellen Prüfpunkten
Unsere Benchmark-Tests haben anhand von Beispielaufträgen gezeigt, dass sich die Prüfpunktzeit mit dem generischen protokollbasierten inkrementellen Prüfpunkt von Minuten auf wenige Sekunden reduziert hat.
Um generische protokollbasierte inkrementelle Prüfpunkte zu aktivieren, legen Sie die folgenden Konfigurationen in Ihrer Datei flink-conf.yaml
fest. Geben Sie den Wert für das Prüfpunkt-Intervall in Millisekunden an.
state.backend.changelog.enabled: true state.backend.changelog.storage: filesystem dstl.dfs.base-path: s3://
bucket-path
/changelog state.backend.local-recovery: true state.backend: rocksdb state.checkpoints.dir: s3://bucket-path
/checkpoint execution.checkpointing.interval:15000
Differenzierte Wiederherstellung
Anmerkung
Eine detaillierte Wiederherstellungsunterstützung für den Standard-Scheduler wird mit Flink on Amazon EMR auf EKS 6.14.0 und höher unterstützt. Die detaillierte Wiederherstellungsunterstützung im Adaptive Scheduler ist mit Flink EMR auf EKS Amazon ab Version 6.15.0 verfügbar.
Wenn eine Aufgabe während der Ausführung fehlschlägt, setzt Flink das gesamte Ausführungsdiagramm zurück und löst eine vollständige Neuausführung ab dem letzten abgeschlossenen Prüfpunkt aus. Das ist teurer, als nur die fehlgeschlagenen Aufgaben erneut auszuführen. Bei einer differenzierten Wiederherstellung wird nur die mit der Pipeline verbundene Komponente der fehlgeschlagenen Aufgabe neu gestartet. Im folgenden Beispiel hat das Auftragsdiagramm 5 Scheitelpunkte (A
bis E
). Alle Verbindungen zwischen den Scheitelpunkten werden punktweise in Pipelines verlegt, und der Wert parallelism.default
für den Auftrag ist auf 2
eingestellt.
A → B → C → D → E
In diesem Beispiel werden insgesamt 10 Aufgaben ausgeführt. Die erste Pipeline (zu) läuft auf einem a1
e1
TaskManager (TM1
), und die zweite Pipeline (a2
toe2
) läuft auf einer anderen TaskManager (TM2
).
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
Es gibt zwei Komponenten, die über eine Pipeline miteinander verbunden sind: a1 → e1
und a2 →
e2
. Wenn entweder TM1
oder TM2
fehlschlägt, wirkt sich der Fehler nur auf die 5 Aufgaben in der Pipeline aus, bei denen TaskManager lief. Bei der Neustartstrategie wird nur die betroffene Pipeline-Komponente gestartet.
Eine differenzierte Wiederherstellung funktioniert nur mit perfekt parallelen Flink-Aufträgen. Sie wird nicht mit keyBy()
- oder redistribute()
-Vorgängen unterstützt. Weitere Informationen finden Sie unter FLIP-1: Fine Grained Recovery from Task Failures im Jira-Projekt
Um die differenzierte Wiederherstellung zu aktivieren, legen Sie die folgenden Konfigurationen in Ihrer flink-conf.yaml
-Datei fest.
jobmanager.execution.failover-strategy: region restart-strategy:
exponential-delay or fixed-delay
Kombinierter Neustartmechanismus im adaptiven Scheduler
Anmerkung
Der kombinierte Neustartmechanismus im Adaptive Scheduler wird mit Flink on Amazon EMR auf EKS 6.15.0 und höher unterstützt.
Der adaptive Scheduler kann die Parallelität des Auftrags auf der Grundlage der verfügbaren Slots anpassen. Er reduziert automatisch die Parallelität, wenn nicht genügend Slots für die konfigurierte Auftragsparallelität verfügbar sind. Wenn neue Slots verfügbar werden, wird der Auftrag wieder auf die konfigurierte Auftragsparallelität hochskaliert. Ein adaptiver Scheduler vermeidet Ausfallzeiten beim Auftrag, wenn nicht genügend Ressourcen verfügbar sind. Dies ist der unterstützte Scheduler für Flink Autoscaler. Aus diesen Gründen empfehlen wir den Adaptive Scheduler mit Amazon EMR Flink. Adaptive Scheduler können jedoch innerhalb kurzer Zeit mehrere Neustarts durchführen, und zwar einen Neustart für jede neu hinzugefügte Ressource. Dies könnte zu einem Leistungsabfall des Auftrags führen.
Mit Amazon EMR 6.15.0 und höher verfügt Flink über einen kombinierten Neustartmechanismus im Adaptive Scheduler, der ein Neustartfenster öffnet, wenn die erste Ressource hinzugefügt wird, und dann bis zum konfigurierten Fensterintervall von standardmäßig 1 Minute wartet. Er führt einen einzigen Neustart durch, wenn genügend Ressourcen zur Verfügung stehen, um den Auftrag mit konfigurierter Parallelität auszuführen, oder wenn das Intervall abgelaufen ist.
Unsere Benchmark-Tests haben anhand von Beispielaufträgen gezeigt, dass dieses Feature 10 % mehr Datensätze verarbeitet als das Standardverhalten, wenn Sie den adaptiven Scheduler und Flink Autoscaler verwenden.
Um den kombinierten Neustartmechanismus zu aktivieren, legen Sie die folgenden Konfigurationen in Ihrer Datei flink-conf.yaml
fest.
jobmanager.adaptive-scheduler.combined-restart.enabled: true jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m