Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Konfiguration von Flink in Amazon EMR
Konfigurieren Sie Flink mit Hive Metastore und Glue Catalog
EMRAmazon-Versionen 6.9.0 und höher unterstützen sowohl Hive Metastore als auch AWS Glue Catalog mit dem Apache Flink-Konnektor zu Hive. In diesem Abschnitt werden die Schritte beschrieben, die zur Konfiguration von AWS Glue Catalog und Hive Metastore mit Flink erforderlich sind.
Verwenden Sie den Hive Metastore
-
Erstellen Sie einen EMR Cluster mit Version 6.9.0 oder höher und mindestens zwei Anwendungen: Hive und Flink.
-
Verwenden Sie Script Runner, um das folgende Skript als Schrittfunktion auszuführen:
hive-metastore-setup.sh
sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
Verwenden Sie den AWS Glue-Datenkatalog
-
Erstellen Sie einen EMR Cluster mit Version 6.9.0 oder höher und mindestens zwei Anwendungen: Hive und Flink.
-
Wählen Sie in den Einstellungen des AWS Glue Data Catalogs die Option Für Hive-Tabellenmetadaten verwenden aus, um den Data Catalog im Cluster zu aktivieren.
-
Verwenden Sie Script Runner, um das folgende Skript als Schrittfunktion auszuführen: Befehle und Skripts auf einem EMR Amazon-Cluster ausführen:
glue-catalog-setup.sh
sudo cp /usr/lib/hive/auxlib/aws-glue-datacatalog-hive3-client.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/aws-glue-datacatalog-hive3-client.jar sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
Konfigurieren Sie Flink mit einer Konfigurationsdatei
Sie können die EMR Amazon-Konfiguration verwendenAPI, um Flink mit einer Konfigurationsdatei zu konfigurieren. Die Dateien, die innerhalb von konfiguriert werden können, API sind:
-
flink-conf.yaml
-
log4j.properties
-
flink-log4j-session
-
log4j-cli.properties
Die Hauptkonfigurationsdatei für Flink heißt flink-conf.yaml
.
So konfigurieren Sie die Anzahl der Aufgaben-Slots für Flink mithilfe der AWS CLI
-
Erstellen Sie eine Datei,
configurations.json
, mit folgendem Inhalt:[ { "Classification": "flink-conf", "Properties": { "taskmanager.numberOfTaskSlots":"2" } } ]
-
Im nächsten Schritt erstellen Sie einen Cluster mit der folgenden Konfiguration:
aws emr create-cluster --release-label
emr-7.5.0
\ --applications Name=Flink \ --configurations file://./configurations.json \ --regionus-east-1
\ --log-uri s3://myLogUri
\ --instance-type m5.xlarge \ --instance-count2
\ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=YourKeyName
,InstanceProfile=EMR_EC2_DefaultRole
Anmerkung
Sie können einige Konfigurationen auch mit dem Flink API ändern. Weitere Informationen finden Sie unter Konzepte
Mit EMR Amazon-Version 5.21.0 und höher können Sie Cluster-Konfigurationen überschreiben und zusätzliche Konfigurationsklassifizierungen für jede Instance-Gruppe in einem laufenden Cluster angeben. Dazu verwenden Sie die EMR Amazon-Konsole, die AWS Command Line Interface (AWS CLI) oder die AWS SDK. Weitere Informationen finden Sie unter Angeben einer Konfiguration für eine Instance-Gruppe in einem aktiven Cluster.
Parallelitätsoptionen
Als Eigentümer Ihrer Anwendung wissen Sie am besten, welche Ressourcen Aufgaben innerhalb von Flink zugewiesen werden müssen. Für Beispiele in dieser Dokumentation verwenden Sie die gleiche Anzahl von Aufgaben wie die Aufgaben-Instances, die Sie für die Anwendung nutzen. Wir empfehlen diese Vorgehensweise generell für die anfängliche Parallelität. Sie können jedoch die Granularität der Parallelität mithilfe von Aufgaben-Slots erhöhen. Dabei sollte die Anzahl von virtuellen Cores
Konfiguration von Flink auf einem EMR Cluster mit mehreren Primärknoten
Der JobManager von Flink bleibt während des Failover-Prozesses für den primären Knoten in einem EMR Amazon-Cluster mit mehreren Primärknoten verfügbar. Ab Amazon EMR 5.28.0 ist JobManager Hochverfügbarkeit auch automatisch aktiviert. Es ist keine manuelle Konfiguration erforderlich.
Bei EMR Amazon-Versionen 5.27.0 oder früher JobManager handelt es sich um einen einzigen Fehlerpunkt. Wenn der JobManager fehlschlägt, gehen alle Jobstatus verloren und die laufenden Jobs werden nicht wieder aufgenommen. Sie können JobManager Hochverfügbarkeit aktivieren, indem Sie die Anzahl der Anwendungsversuche, das Checkpointing und die Aktivierung ZooKeeper als Statusspeicher für Flink konfigurieren, wie das folgende Beispiel zeigt:
[ { "Classification": "yarn-site", "Properties": { "yarn.resourcemanager.am.max-attempts": "10" } }, { "Classification": "flink-conf", "Properties": { "yarn.application-attempts": "10", "high-availability": "zookeeper", "high-availability.zookeeper.quorum": "%{hiera('hadoop::zk')}", "high-availability.storageDir": "hdfs:///user/flink/recovery", "high-availability.zookeeper.path.root": "/flink" } } ]
Sie müssen sowohl die maximale Anzahl der Anwendungsmasterversuche für Flink als auch die maximalen Anwendungsversuche für YARN Flink konfigurieren. Weitere Informationen finden Sie unter Konfiguration der Hochverfügbarkeit von YARN Clustern
Konfiguration der Größe des Speicherprozesses
Für EMR Amazon-Versionen, die Flink 1.11.x verwenden, müssen Sie die Gesamtspeicherprozessgröße sowohl für () als auch für JobManager (jobmanager.memory.process.size
) in konfigurieren. TaskManager taskmanager.memory.process.size
flink-conf.yaml
Sie können diese Werte festlegen, indem Sie entweder den Cluster mit der Konfiguration konfigurieren API oder diese Felder manuell über auskommentieren. SSH Flink bietet die folgenden Standardwerte.
-
jobmanager.memory.process.size
: 1 600 m -
taskmanager.memory.process.size
: 1 728 m
Um JVM Metaspace und Overhead auszuschließen, verwenden Sie stattdessen die gesamte Flink-Speichergröße (). taskmanager.memory.flink.size
taskmanager.memory.process.size
Der Standardwert von taskmanager.memory.process.size
beträgt 1 280 m. Es wird nicht empfohlen, sowohl taskmanager.memory.process.size
als auch taskmanager.memory.process.size
zu setzen.
Alle EMR Amazon-Versionen, die Flink 1.12.0 und höher verwenden, haben die im Open-Source-Set für Flink aufgeführten Standardwerte als Standardwerte bei AmazonEMR, sodass Sie sie nicht selbst konfigurieren müssen.
Größe der Protokollausgabedatei konfigurieren
Flink-Anwendungscontainer erstellen drei Arten von Protokolldateien und schreiben in sie: .out
-Dateien, .log
-Dateien und .err
-Dateien. Nur .err
-Dateien werden komprimiert und aus dem Dateisystem entfernt, während .log
- und .out
-Protokolldateien im Dateisystem verbleiben. Um sicherzustellen, dass diese Ausgabedateien verwaltbar bleiben und der Cluster stabil bleibt, können Sie die Protokollrotation in log4j.properties
so konfigurieren, dass eine maximale Anzahl von Dateien festgelegt und deren Größe begrenzt wird.
EMRAmazon-Versionen 5.30.0 und höher
Ab Amazon EMR 5.30.0 verwendet Flink das Logging-Framework log4j2 mit dem Namen der Konfigurationsklassifikation. Die folgende Beispielkonfiguration demonstriert flink-log4j.
das log4j2-Format.
[ { "Classification": "flink-log4j", "Properties": { "appender.main.name": "MainAppender", "appender.main.type": "RollingFile", "appender.main.append" : "false", "appender.main.fileName" : "${sys:log.file}", "appender.main.filePattern" : "${sys:log.file}.%i", "appender.main.layout.type" : "PatternLayout", "appender.main.layout.pattern" : "%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n", "appender.main.policies.type" : "Policies", "appender.main.policies.size.type" : "SizeBasedTriggeringPolicy", "appender.main.policies.size.size" : "100MB", "appender.main.strategy.type" : "DefaultRolloverStrategy", "appender.main.strategy.max" : "10" }, } ]
EMRAmazon-Versionen 5.29.0 und früher
In den EMR Amazon-Versionen 5.29.0 und früher verwendet Flink das Logging-Framework log4j. Die folgende Beispielkonfiguration veranschaulicht das log4j Format.
[ { "Classification": "flink-log4j", "Properties": { "log4j.appender.file": "org.apache.log4j.RollingFileAppender", "log4j.appender.file.append":"true", # keep up to 4 files and each file size is limited to 100MB "log4j.appender.file.MaxFileSize":"100MB", "log4j.appender.file.MaxBackupIndex":4, "log4j.appender.file.layout":"org.apache.log4j.PatternLayout", "log4j.appender.file.layout.ConversionPattern":"%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n" }, } ]
Flink für die Ausführung mit Java 11 konfigurieren
EMRAmazon-Versionen 6.12.0 und höher bieten Java 11-Laufzeitunterstützung für Flink. In den folgenden Abschnitten wird beschrieben, wie der Cluster so konfiguriert wird, dass er Java-11-Laufzeitunterstützung für Flink bereitstellt.
Themen
Konfigurieren Sie Flink für Java 11, wenn Sie einen Cluster erstellen
Gehen Sie wie folgt vor, um einen EMR Cluster mit Flink und Java 11-Runtime zu erstellen. Die Konfigurationsdatei, in der Sie die Java-11-Laufzeitunterstützung hinzufügen, befindet sich. flink-conf.yaml
Flink für Java 11 auf einem laufenden Cluster konfigurieren
Gehen Sie wie folgt vor, um einen laufenden EMR Cluster mit Flink und Java 11-Runtime zu aktualisieren. Die Konfigurationsdatei, in der Sie die Java-11-Laufzeitunterstützung hinzufügen, befindet sich. flink-conf.yaml
Bestätigen Sie die Java-Laufzeit für Flink auf einem laufenden Cluster
Um die Java-Laufzeit für einen laufenden Cluster zu ermitteln, melden Sie sich beim primären Knoten an, SSH wie unter Connect zum Primärknoten herstellen mit beschriebenSSH. Führen Sie anschließend den folgenden Befehl aus:
ps -ef | grep flink
Der ps
-Befehl mit der -ef
-Option listet alle laufenden Prozesse auf dem System auf. Sie können diese Ausgabe mit grep
filtern, um Erwähnungen der Zeichenfolge flink
zu finden. Überprüfen Sie die Ausgabe für den Wert Java Runtime Environment (JRE),jre-XX
. In der folgenden Ausgabe wird jre-11
angegeben, dass Java 11 zur Laufzeit von Flink abgerufen wird.
flink 19130 1 0 09:17 ? 00:00:15 /usr/lib/jvm/jre-11/bin/java -Djava.io.tmpdir=/mnt/tmp -Dlog.file=/usr/lib/flink/log/flink-flink-historyserver-0-ip-172-31-32-127.log -Dlog4j.configuration=file:/usr/lib/flink/conf/log4j.properties -Dlog4j.configurationFile=file:/usr/lib/flink/conf/log4j.properties -Dlogback.configurationFile=file:/usr/lib/flink/conf/logback.xml -classpath /usr/lib/flink/lib/flink-cep-1.17.0.jar:/usr/lib/flink/lib/flink-connector-files-1.17.0.jar:/usr/lib/flink/lib/flink-csv-1.17.0.jar:/usr/lib/flink/lib/flink-json-1.17.0.jar:/usr/lib/flink/lib/flink-scala_2.12-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-java-uber-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-scala-bridge_2.12-1.17.0.
Alternativ können Sie sich mit dem Befehl flink-yarn-session -d
beim primären Knoten anmelden SSH und eine YARN Flink-Sitzung starten. Die Ausgabe zeigt die Java Virtual Machine (JVM) für Flink, java-11-amazon-corretto
im folgenden Beispiel:
2023-05-29 10:38:14,129 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: containerized.master.env.JAVA_HOME, /usr/lib/jvm/java-11-amazon-corretto.x86_64