Optimierung der Neustartzeiten von Flink-Jobs für Aufgabenwiederherstellungs- und Skalierungsvorgänge bei aktiviertem Amazon EMR EKS - Amazon EMR

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Optimierung der Neustartzeiten von Flink-Jobs für Aufgabenwiederherstellungs- und Skalierungsvorgänge bei aktiviertem Amazon EMR EKS

Wenn eine Aufgabe fehlschlägt oder wenn ein Skalierungsvorgang stattfindet, versucht Flink, die Aufgabe vom letzten abgeschlossenen Prüfpunkt aus erneut auszuführen. Die Ausführung des Neustartvorgangs kann eine Minute oder länger dauern, abhängig von der Größe des Prüfpunktzustands und der Anzahl der parallelen Aufgaben. Während des Neustarts können sich Backlog-Aufgaben für den Auftrag ansammeln. Es gibt jedoch einige Möglichkeiten, wie Flink die Geschwindigkeit der Wiederherstellung und des Neustarts von Ausführungsdiagrammen optimiert, um die Auftragsstabilität zu verbessern.

Auf dieser Seite werden einige Möglichkeiten beschrieben, mit denen Amazon EMR Flink die Zeit für den Neustart von Jobs bei der Wiederherstellung von Aufgaben oder bei Skalierungsvorgängen auf Spot-Instances verbessern kann. Spot-Instances sind ungenutzte Rechenkapazität, die mit einem discount erhältlich ist. Es weist einzigartige Verhaltensweisen auf, einschließlich gelegentlicher Unterbrechungen. Daher ist es wichtig zu verstehen, wie Amazon EMR on mit diesen EKS umgeht, einschließlich der Art und Weise, wie Amazon EMR on die Außerbetriebnahme und Neustarts von Jobs EKS durchführt.

Anmerkung

Task-Local Recovery wird mit Flink on Amazon EMR auf Version EKS 6.14.0 und höher unterstützt.

Mit Flink-Prüfpunkten erstellt jede Aufgabe einen Snapshot ihres Status, den Flink in verteilte Speicher wie Amazon S3 schreibt. Im Falle einer Wiederherstellung stellen die Aufgaben ihren Status aus dem verteilten Speicher wieder her. Der verteilte Speicher bietet Fehlertoleranz und kann den Status während der Neuskalierung neu verteilen, da er für alle Knoten zugänglich ist.

Ein verteilter Remote-Speicher hat jedoch auch einen Nachteil: Alle Aufgaben müssen ihren Status von einem entfernten Standort aus über das Netzwerk lesen. Dies kann bei der Aufgabenwiederherstellung oder bei Skalierungsvorgängen zu langen Wiederherstellungszeiten für große Zustände führen.

Dieses Problem der langen Wiederherstellungszeit wird durch eine aufgabenlokale Wiederherstellung gelöst. Aufgaben schreiben ihren Status am Prüfüunkt in einen sekundären Speicher, der sich lokal zur Aufgabe befindet, z. B. auf eine lokale Festplatte. Sie speichern ihren Status auch im Primärspeicher oder in unserem Fall in Amazon S3. Während der Wiederherstellung plant der Scheduler die Aufgaben in demselben Task-Manager, in dem die Aufgaben zuvor ausgeführt wurden, sodass sie aus dem lokalen Statusspeicher wiederhergestellt werden können, anstatt sie aus dem Remote-Statusspeicher zu lesen. Weitere Informationen finden Sie unter Aufgabenlokale Wiederherstellung in der Apache-Flink-Dokumentation.

Unsere Benchmark-Tests mit Beispielaufträgen haben gezeigt, dass die Wiederherstellungszeit bei aktivierter aufgabenlokaler Wiederherstellung von Minuten auf wenige Sekunden reduziert wurde.

Um die aufgabenlokale Wiederherstellung zu aktivieren, legen Sie die folgenden Konfigurationen in Ihrer flink-conf.yaml-Datei fest. Geben Sie den Wert für das Prüfpunkt-Intervall in Millisekunden an.

state.backend.local-recovery: true state.backend: hasmap or rocksdb state.checkpoints.dir: s3://STORAGE-BUCKET-PATH/checkpoint execution.checkpointing.interval: 15000
Anmerkung

Task-Local Recovery von Amazon EBS wird mit Flink on Amazon EMR auf Version EKS 6.15.0 und höher unterstützt.

Wenn Flink EMR on Amazon aktiviert istEKS, können Sie automatisch EBS Amazon-Volumes für TaskManager Pods für die lokale Wiederherstellung von Aufgaben. Der Standard-Overlay-Mount verfügt über ein Volumen von 10 GB, was für Aufträge mit einem niedrigeren Status ausreichend ist. Bei Aufträgen mit großen Status kann die Option zum automatischen Einbinden von EBS Volumes aktiviert werden. Das Tool TaskManager Pods werden bei der Pod-Erstellung automatisch erstellt und bereitgestellt und beim Löschen des Pods entfernt.

Gehen Sie wie folgt vor, um die automatische EBS Volumenanpassung für Flink in Amazon EMR am EKS zu aktivieren:

  1. Exportieren Sie die Werte für die folgenden Variablen, die Sie in den nächsten Schritten verwenden werden.

    export AWS_REGION=aa-example-1 export FLINK_EKS_CLUSTER_NAME=my-cluster export AWS_ACCOUNT_ID=111122223333
  2. Erstellen oder aktualisieren Sie eine kubeconfig YAML Datei für Ihren Cluster.

    aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  3. Erstellen Sie ein IAM Servicekonto für den Amazon EBS Container Storage Interface (CSI) -Treiber in Ihrem EKS Amazon-Cluster.

    eksctl create iamserviceaccount \ --name ebs-csi-controller-sa \ --namespace kube-system \ --region $AWS_REGION \ --cluster $FLINK_EKS_CLUSTER_NAME\ --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \ --role-only \ --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \ --approve
  4. Erstellen Sie den EBS CSI Amazon-Treiber mit dem folgenden Befehl:

    eksctl create addon \ --name aws-ebs-csi-driver \ --region $AWS_REGION \ --cluster $FLINK_EKS_CLUSTER_NAME \ --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
  5. Erstellen Sie die EBS Amazon-Speicherklasse mit dem folgenden Befehl:

    cat ≪ EOF ≫ storage-class.yaml apiVersion: storage.k8s.io/v1 kind: StorageClass metadata: name: ebs-sc provisioner: ebs.csi.aws.com volumeBindingMode: WaitForFirstConsumer EOF

    Und wenden Sie dann die Klasse an:

    kubectl apply -f storage-class.yaml
  6. Helm installiert den Amazon EMR Flink Kubernetes-Operator mit Optionen zum Erstellen eines Dienstkontos. Dadurch wird der emr-containers-sa-flink erstellt, der in der Flink-Bereitstellung verwendet werden soll.

    helm install flink-kubernetes-operator flink-kubernetes-operator/ \ --set jobServiceAccount.create=true \ --set rbac.jobRole.create=true \ --set rbac.jobRoleBinding.create=true
  7. Um den Flink-Job einzureichen und die automatische Bereitstellung von EBS Volumes für die aufgabenlokale Wiederherstellung zu aktivieren, legen Sie die folgenden Konfigurationen in Ihrer Datei fest. flink-conf.yaml Passen Sie die Größenbeschränkung an die Statusgröße des Auftrags an. Setzen Sie serviceAccount auf emr-containers-sa-flink. Geben Sie den Wert für das Prüfpunkt-Intervall in Millisekunden an. Und lassen Sie den executionRoleArn weg.

    flinkConfiguration: task.local-recovery.ebs.enable: true kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi state.checkpoints.dir: s3://BUCKET-PATH/checkpoint state.backend.local-recovery: true state.backend: hasmap or rocksdb state.backend.incremental: "true" execution.checkpointing.interval: 15000 serviceAccount: emr-containers-sa-flink

Wenn Sie bereit sind, das EBS CSI Amazon-Treiber-Plugin zu löschen, verwenden Sie die folgenden Befehle:

# Detach Attached Policy aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy # Delete the created Role aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} # Delete the created service account eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION # Delete Addon eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION # Delete the EBS storage class kubectl delete -f storage-class.yaml
Anmerkung

Generisches logbasiertes inkrementelles Checkpointing wird mit Flink on Amazon EMR auf EKS Version 6.14.0 und höher unterstützt.

Generische protokollbasierte inkrementelle Prüfpunkte wurden in Flink 1.16 hinzugefügt, um die Geschwindigkeit von Prüfpunkten zu verbessern. Ein schnelleres Prüfpunktintervall führt häufig zu einer Reduzierung des Wiederherstellungsaufwands, da weniger Ereignisse nach der Wiederherstellung erneut verarbeitet werden müssen. Weitere Informationen finden Sie im Apache-Flink-Blog unter Verbesserung der Geschwindigkeit und Stabilität von Prüfpunkten mit generischen protokollbasierten inkrementellen Prüfpunkten.

Unsere Benchmark-Tests haben anhand von Beispielaufträgen gezeigt, dass sich die Prüfpunktzeit mit dem generischen protokollbasierten inkrementellen Prüfpunkt von Minuten auf wenige Sekunden reduziert hat.

Um generische protokollbasierte inkrementelle Prüfpunkte zu aktivieren, legen Sie die folgenden Konfigurationen in Ihrer Datei flink-conf.yaml fest. Geben Sie den Wert für das Prüfpunkt-Intervall in Millisekunden an.

state.backend.changelog.enabled: true state.backend.changelog.storage: filesystem dstl.dfs.base-path: s3://bucket-path/changelog state.backend.local-recovery: true state.backend: rocksdb state.checkpoints.dir: s3://bucket-path/checkpoint execution.checkpointing.interval: 15000
Anmerkung

Eine detaillierte Wiederherstellungsunterstützung für den Standard-Scheduler wird mit Flink on Amazon EMR auf EKS 6.14.0 und höher unterstützt. Die detaillierte Wiederherstellungsunterstützung im Adaptive Scheduler ist mit Flink EMR auf EKS Amazon ab Version 6.15.0 verfügbar.

Wenn eine Aufgabe während der Ausführung fehlschlägt, setzt Flink das gesamte Ausführungsdiagramm zurück und löst eine vollständige Neuausführung ab dem letzten abgeschlossenen Prüfpunkt aus. Das ist teurer, als nur die fehlgeschlagenen Aufgaben erneut auszuführen. Bei einer differenzierten Wiederherstellung wird nur die mit der Pipeline verbundene Komponente der fehlgeschlagenen Aufgabe neu gestartet. Im folgenden Beispiel hat das Auftragsdiagramm 5 Scheitelpunkte (A bis E). Alle Verbindungen zwischen den Scheitelpunkten werden punktweise in Pipelines verlegt, und der Wert parallelism.default für den Auftrag ist auf 2 eingestellt.

A → B → C → D → E

In diesem Beispiel werden insgesamt 10 Aufgaben ausgeführt. Die erste Pipeline (zu) läuft auf einem a1 e1 TaskManager (TM1), und die zweite Pipeline (a2toe2) läuft auf einer anderen TaskManager (TM2).

a1 → b1 → c1 → d1 → e1 a2 → b2 → c2 → d2 → e2

Es gibt zwei Komponenten, die über eine Pipeline miteinander verbunden sind: a1 → e1 und a2 → e2. Wenn entweder TM1 oder TM2 fehlschlägt, wirkt sich der Fehler nur auf die 5 Aufgaben in der Pipeline aus, bei denen TaskManager lief. Bei der Neustartstrategie wird nur die betroffene Pipeline-Komponente gestartet.

Eine differenzierte Wiederherstellung funktioniert nur mit perfekt parallelen Flink-Aufträgen. Sie wird nicht mit keyBy()- oder redistribute()-Vorgängen unterstützt. Weitere Informationen finden Sie unter FLIP-1: Fine Grained Recovery from Task Failures im Jira-Projekt Flink Improvement Proposal.

Um die differenzierte Wiederherstellung zu aktivieren, legen Sie die folgenden Konfigurationen in Ihrer flink-conf.yaml-Datei fest.

jobmanager.execution.failover-strategy: region restart-strategy: exponential-delay or fixed-delay
Anmerkung

Der kombinierte Neustartmechanismus im Adaptive Scheduler wird mit Flink on Amazon EMR auf EKS 6.15.0 und höher unterstützt.

Der adaptive Scheduler kann die Parallelität des Auftrags auf der Grundlage der verfügbaren Slots anpassen. Er reduziert automatisch die Parallelität, wenn nicht genügend Slots für die konfigurierte Auftragsparallelität verfügbar sind. Wenn neue Slots verfügbar werden, wird der Auftrag wieder auf die konfigurierte Auftragsparallelität hochskaliert. Ein adaptiver Scheduler vermeidet Ausfallzeiten beim Auftrag, wenn nicht genügend Ressourcen verfügbar sind. Dies ist der unterstützte Scheduler für Flink Autoscaler. Aus diesen Gründen empfehlen wir den Adaptive Scheduler mit Amazon EMR Flink. Adaptive Scheduler können jedoch innerhalb kurzer Zeit mehrere Neustarts durchführen, und zwar einen Neustart für jede neu hinzugefügte Ressource. Dies könnte zu einem Leistungsabfall des Auftrags führen.

Mit Amazon EMR 6.15.0 und höher verfügt Flink über einen kombinierten Neustartmechanismus im Adaptive Scheduler, der ein Neustartfenster öffnet, wenn die erste Ressource hinzugefügt wird, und dann bis zum konfigurierten Fensterintervall von standardmäßig 1 Minute wartet. Er führt einen einzigen Neustart durch, wenn genügend Ressourcen zur Verfügung stehen, um den Auftrag mit konfigurierter Parallelität auszuführen, oder wenn das Intervall abgelaufen ist.

Unsere Benchmark-Tests haben anhand von Beispielaufträgen gezeigt, dass dieses Feature 10 % mehr Datensätze verarbeitet als das Standardverhalten, wenn Sie den adaptiven Scheduler und Flink Autoscaler verwenden.

Um den kombinierten Neustartmechanismus zu aktivieren, legen Sie die folgenden Konfigurationen in Ihrer Datei flink-conf.yaml fest.

jobmanager.adaptive-scheduler.combined-restart.enabled: true jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m