Verwenden der Hochverfügbarkeit (HA) für Flink-Operatoren und Flink-Anwendungen - Amazon EMR

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Verwenden der Hochverfügbarkeit (HA) für Flink-Operatoren und Flink-Anwendungen

In diesem Thema wird gezeigt, wie Hochverfügbarkeit konfiguriert wird, und es wird beschrieben, wie dies für verschiedene Anwendungsfälle funktioniert. Dazu gehören, wenn Sie den Job Manager verwenden und wenn Sie native Kubernetes von Flink verwenden.

Wir ermöglichen Hochverfügbarkeit für den Flink-Operator, sodass wir auf einen Standby-Flink-Operator umschalten können, um Ausfallzeiten im Regelkreis des Bedieners zu minimieren, falls Ausfälle auftreten. Hochverfügbarkeit ist standardmäßig aktiviert, und die Standardanzahl der Startoperatorreplikate ist 2. Sie können das Feld Replicas in Ihrer values.yaml-Datei für das Helm-Chart konfigurieren.

Die folgenden Felder sind anpassbar:

  • replicas (optional, Standardeinstellung ist 2): Wenn Sie diese Zahl auf einen Wert über 1 setzen, werden weitere Standby-Operatoren erstellt und Ihr Auftrag kann schneller wiederhergestellt werden.

  • highAvailabilityEnabled (optional, der Standardwert ist true): Steuert, ob Sie HA aktivieren möchten. Wenn Sie diesen Parameter auf true angeben, wird die Unterstützung für Multi-AZ-Bereitstellungen aktiviert und die richtigen flink-conf.yaml-Parameter festgelegt.

Sie können HA für Ihren Betreiber deaktivieren, indem Sie die folgende Konfiguration in Ihrer values.yaml-Datei festlegen.

... imagePullSecrets: [] replicas: 1 # set this to false if you don't want HA highAvailabilityEnabled: false ...

Multi-AZ-Bereitstellung

Wir erstellen die Operator-Pods in mehreren Availability Zones. Dabei handelt es sich um eine weiche Einschränkung, und Ihre Operator-Pods werden in derselben AZ geplant, falls Sie nicht über genügend Ressourcen in einer anderen AZ verfügen.

Bestimmung des Leader-Replikats

Wenn HA aktiviert ist, verwenden die Replikate ein Leasing, um zu ermitteln, welcher der Leader JMs ist, und verwenden einen K8s-Lease für die Auswahl des Leaders. Sie können den Lease beschreiben und anhand des Felds .Spec.Holder Identity den aktuellen Leader ermitteln

kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"

Flink-S3-Interaktion

Konfigurieren von Zugriffs-Anmeldeinformationen

Bitte stellen Sie sicher, dass Sie über die entsprechenden IAM Berechtigungen für IRSA den Zugriff auf den S3-Bucket verfügen.

Auftrag-Jars werden aus dem S3-Anwendungsmodus abgerufen

Der Flink-Operator unterstützt auch das Abrufen von Anwendungs-Jars aus S3. Sie geben einfach den S3-Speicherort für das Glas URI in Ihrer FlinkDeployment Spezifikation an.

Sie können diese Funktion auch verwenden, um andere Artefakte wie PyFlink Skripte herunterzuladen. Das resultierende Python-Skript wird unter dem Pfad /opt/flink/usrlib/ abgelegt.

Das folgende Beispiel zeigt, wie Sie diese Funktion für einen PyFlink Job verwenden. Beachten Sie die Felder jar URI und args.

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: python-example spec: image: <YOUR CUSTOM PYFLINK IMAGE> emrReleaseLabel: "emr-6.12.0-flink-latest" flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" serviceAccount: flink jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"] parallelism: 1 upgradeMode: stateless

Flink-S3-Konnektoren

Flink wird mit zwei S3-Konnektoren geliefert (unten aufgeführt). In den folgenden Abschnitten wird erläutert, wann welcher Anschluss verwendet werden sollte.

Checkpointing: Presto-S3-Konnektoren

  • S3-Schema auf s3p://setzen

  • Der empfohlene Konnektor für den Checkpoint zu s3. Weitere Informationen finden Sie unter S3-specific in der Apache Flink-Dokumentation.

Beispielspezifikation: FlinkDeployment

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/

Lesen und Schreiben auf S3: Hadoop S3-Konnektor

  • S3-Schema auf s3:// oder (s3a://) setzen

  • Der empfohlene Konnektor zum Lesen und Schreiben von Dateien aus S3 (einziger S3-Konnektor, der die Flinks-Dateisystemschnittstelle implementiert).

  • Standardmäßig legen wir fs.s3a.aws.credentials.provider in der flink-conf.yaml Datei fest, was istcom.amazonaws.auth.WebIdentityTokenCredentialsProvider. Wenn Sie die Standardeinstellung flink-conf vollständig überschreiben und mit S3 interagieren, stellen Sie sicher, dass Sie diesen Anbieter verwenden.

FlinkDeployment Beispielspezifikation

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: job: jarURI: local:///opt/flink/examples/streaming/WordCount.jar args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ] parallelism: 2 upgradeMode: stateless

Mit High Availability (HA) für Flink-Bereitstellungen können Jobs auch dann weiter bearbeitet werden, wenn ein vorübergehender Fehler auftritt und Ihr System abstürzt. JobManager Die Aufträge werden neu gestartet, jedoch ab dem letzten erfolgreichen Checkpoint mit aktivierter HA. Wenn HA nicht aktiviert ist, startet Kubernetes Ihren Job neu JobManager, aber Ihr Job wird als neuer Job gestartet und verliert seinen Fortschritt. Nach der Konfiguration von HA können wir Kubernetes anweisen, die HA-Metadaten in einem persistenten Speicher zu speichern, um im Falle eines vorübergehenden Fehlers darauf zurückgreifen zu können, JobManager und dann unsere Jobs vom letzten erfolgreichen Checkpoint aus fortsetzen.

HA ist standardmäßig für Ihre Flink-Aufträge aktiviert (die Anzahl der Replikate ist auf 2 festgelegt, sodass Sie einen S3-Speicherort angeben müssen, damit HA-Metadaten bestehen bleiben).

HA-Konfigurationen

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" executionRoleArn: "<JOB EXECUTION ROLE ARN>" emrReleaseLabel: "emr-6.13.0-flink-latest" jobManager: resource: memory: "2048m" cpu: 1 replicas: 2 highAvailabilityEnabled: true storageDir: "s3://<S3 PERSISTENT STORAGE DIR>" taskManager: resource: memory: "2048m" cpu: 1

Im Folgenden finden Sie Beschreibungen für die oben genannten HA-Konfigurationen in Job Manager (definiert unter .spec). jobManager):

  • highAvailabilityEnabled (optional, der Standardwert ist true): Setzen Sie diesen Wert auf false , wenn Sie HA nicht aktivieren und die bereitgestellten HA-Konfigurationen nicht verwenden möchten. Sie können das Feld „Replicas“ immer noch bearbeiten, um HA manuell zu konfigurieren.

  • replicas(optional, Standardeinstellung ist 2): Wenn Sie diese Zahl auf einen Wert über 1 setzen, wird ein anderer Standby-Modus aktiviert JobManagers und Ihr Job kann schneller wiederhergestellt werden. Wenn Sie HA deaktivieren, müssen Sie die Anzahl der Replikate auf 1 setzen, sonst erhalten Sie weiterhin Validierungsfehler (nur 1 Replikat wird unterstützt, wenn HA nicht aktiviert ist).

  • storageDir(erforderlich): Da wir die Anzahl der Replikate standardmäßig auf 2 setzen, müssen wir einen persistenten storageDir Wert angeben. Derzeit akzeptiert dieses Feld nur S3-Pfade als Speicherort.

Pod-Lokalität

Wenn Sie HA aktivieren, versuchen wir auch, Pods in derselben AZ zusammenzufassen, was zu einer verbesserten Leistung führt (geringere Netzwerklatenz, da sich Pods in derselben AZs befinden). Dabei handelt es sich um einen bestmöglichen Prozess. Wenn Sie also nicht über genügend Ressourcen in der AZ verfügen, in der die meisten Ihrer Pods geplant sind, werden die verbleibenden Pods zwar geplant, landen aber möglicherweise auf einem Knoten außerhalb dieser AZ.

Bestimmung des Leader-Replikats

Wenn HA aktiviert ist, ermitteln die Replikate anhand eines Leases, welches der Leader JMs ist, und verwenden eine K8s-Configmap als Datenspeicher zum Speichern dieser Metadaten. Wenn Sie den Leader ermitteln möchten, können Sie sich den Inhalt der Configmap und den Schlüssel org.apache.flink.k8s.leader.restserver unter Daten ansehen, um den K8s-Pod mit der IP-Adresse zu finden. Sie können auch die folgenden Bash-Befehle verwenden.

ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}') kubectl get pods -n NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"

Amazon EMR 6.13.0 und höher unterstützt Flink Native Kubernetes für die Ausführung von Flink-Anwendungen im Hochverfügbarkeitsmodus auf einem Amazon-Cluster. EKS

Anmerkung

Sie müssen einen Amazon-S3-Bucket erstellt haben, um die Hochverfügbarkeitsmetadaten zu speichern, wenn Sie Ihren Flink-Auftrag einreichen. Wenn Sie dieses Feature nicht verwenden möchten, können Sie sie deaktivieren. Sie ist standardmäßig aktiviert.

Um die Hochverfügbarkeitsfunktion von Flink zu aktivieren, geben Sie bei der Ausführung des Befehls die folgenden Flink-Parameter an. run-application CLI Die Parameter sind unter dem Beispiel definiert.

-Dhigh-availability.type=kubernetes \ -Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \ -Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \ -Dkubernetes.jobmanager.replicas=3 \ -Dkubernetes.cluster-id=example-cluster
  • Dhigh-availability.storageDir – Ein S3-Bucket, in dem Sie die Ergebnisse dieser Anforderung speichern möchten.

    Dkubernetes.jobmanager.replicas – Die Anzahl der Job-Manager-Pods, die als Ganzzahl größer als 1 erstellt werden sollen.

    Dkubernetes.cluster-id – Eine eindeutige ID, die den Flink-Cluster identifiziert.