Stellen bei Spark - Amazon EMR

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Stellen bei Spark

Sie können Spark-Jobs in einer Anwendung ausführen, bei der der type Parameter auf gesetzt istSPARK. Jobs müssen mit der Spark-Version kompatibel sein, die mit der EMR Amazon-Release-Version kompatibel ist. Wenn Sie beispielsweise Jobs mit Amazon EMR Release 6.6.0 ausführen, muss Ihr Job mit Apache Spark 3.2.0 kompatibel sein. Informationen zu den Anwendungsversionen der einzelnen Versionen finden Sie unter. Release-Versionen von Amazon EMR Serverless

Spark-Job-Parameter

Wenn Sie den verwenden StartJobRunAPI, um einen Spark-Job auszuführen, können Sie die folgenden Parameter angeben.

Runtime-Rolle für Spark-Jobs

Geben executionRoleArnSie hier die ARN für die IAM Rolle an, die Ihre Anwendung zur Ausführung von Spark-Jobs verwendet. Diese Rolle muss die folgenden Berechtigungen enthalten:

  • Lesen Sie aus S3-Buckets oder anderen Datenquellen, in denen sich Ihre Daten befinden

  • Lesen Sie aus S3-Buckets oder -Präfixen, in denen sich Ihr Skript oder Ihre Datei PySpark befindet JAR

  • Schreiben Sie in S3-Buckets, in die Sie Ihre endgültige Ausgabe schreiben möchten

  • Schreiben Sie Protokolle in einen S3-Bucket oder ein Präfix, das Folgendes angibt S3MonitoringConfiguration

  • Zugriff auf KMS Schlüssel, wenn Sie KMS Schlüssel zum Verschlüsseln von Daten in Ihrem S3-Bucket verwenden

  • Zugriff auf den AWS Glue-Datenkatalog, wenn Sie Spark verwenden SQL

Wenn Ihr Spark-Job Daten in oder aus anderen Datenquellen liest oder schreibt, geben Sie die entsprechenden Berechtigungen in dieser IAM Rolle an. Wenn Sie diese Berechtigungen nicht für die IAM Rolle bereitstellen, schlägt der Job möglicherweise fehl. Weitere Informationen erhalten Sie unter Job-Runtime-Rollen für Amazon EMR Serverless und Speichern von Protokollen.

Treiberparameter für Spark-Jobs

Wird verwendet jobDriver, um Eingaben für den Job bereitzustellen. Der Job-Treiber-Parameter akzeptiert nur einen Wert für den Auftragstyp, den Sie ausführen möchten. Für einen Spark-Job lautet der ParameterwertsparkSubmit. Sie können diesen Jobtyp verwenden, um Scala, Java PySpark, SparkR und alle anderen unterstützten Jobs über Spark Submit auszuführen. Spark-Jobs haben die folgenden Parameter:

  • sparkSubmitParameters— Dies sind die zusätzlichen Spark-Parameter, die Sie an den Job senden möchten. Verwenden Sie diesen Parameter, um Spark-Standardeigenschaften wie Treiberspeicher oder Anzahl der Executoren zu überschreiben, wie sie in den Argumenten --conf oder --class definiert sind.

  • entryPointArguments— Dies ist eine Reihe von Argumenten, die Sie an Ihre Haupt JAR - oder Python-Datei übergeben möchten. Sie sollten das Lesen dieser Parameter mit Ihrem Einstiegspunkt-Code regeln. Trennen Sie jedes Argument im Array durch ein Komma.

  • entryPoint— Dies ist der Verweis in Amazon S3 auf die Haupt JAR - oder Python-Datei, die Sie ausführen möchten. Wenn Sie Scala oder Java ausführenJAR, geben Sie SparkSubmitParameters mithilfe des --class Arguments die Haupteintragsklasse an.

Weitere Informationen finden Sie unter Starten von Anwendungen mit Spark-Submit.

Parameter zur Überschreibung der Spark-Konfiguration

Wird verwendet configurationOverrides, um die Konfigurationseigenschaften auf Überwachungs- und Anwendungsebene zu überschreiben. Dieser Parameter akzeptiert ein JSON Objekt mit den folgenden zwei Feldern:

  • monitoringConfiguration‐ Verwenden Sie dieses Feld, um Amazon S3 URL (s3MonitoringConfiguration) anzugeben, in dem der EMR Serverless-Job die Protokolle Ihres Spark-Jobs speichern soll. Stellen Sie sicher, dass Sie diesen Bucket mit demselben Bucket erstellt haben AWS-Konto , der Ihre Anwendung hostet, und mit demselben Bucket, in AWS-Region dem Ihr Job ausgeführt wird.

  • applicationConfiguration— Um die Standardkonfigurationen für Anwendungen zu überschreiben, können Sie in diesem Feld ein Konfigurationsobjekt angeben. Sie können eine Kurzsyntax verwenden, um die Konfiguration bereitzustellen, oder Sie können das Konfigurationsobjekt in einer JSON Datei referenzieren. Konfigurationsobjekte bestehen aus einer Klassifizierung, Eigenschaften und optionalen verschachtelten Konfigurationen. Eigenschaften bestehen aus den Einstellungen, die Sie in dieser Datei überschreiben möchten. Sie können mehrere Klassifizierungen für mehrere Anwendungen in einem einzigen Objekt angeben. JSON

    Anmerkung

    Die verfügbaren Konfigurationsklassifizierungen variieren je nach EMR Serverless-Version. Zum Beispiel Klassifizierungen für benutzerdefiniertes Log4j spark-driver-log4j2 und spark-executor-log4j2 sind nur mit Versionen 6.8.0 und höher verfügbar.

Wenn Sie dieselbe Konfiguration in einer Anwendungsüberschreibung und in Spark-Übergabeparametern verwenden, haben die Spark-Submit-Parameter Priorität. Konfigurationen haben folgende Priorität, von der höchsten zur niedrigsten:

  • Konfiguration, die EMR Serverless bei der Erstellung bereitstellt. SparkSession

  • Konfiguration, die Sie als Teil des sparkSubmitParameters --conf Arguments angeben.

  • Die Konfiguration, die Sie als Teil Ihrer Anwendung angeben, hat Vorrang vor dem Start eines Jobs.

  • Konfiguration, die Sie bei der Erstellung einer Anwendung runtimeConfiguration als Teil Ihrer angeben.

  • Optimierte Konfigurationen, die Amazon für die Veröffentlichung EMR verwendet.

  • Open-Source-Standardkonfigurationen für die Anwendung.

Weitere Informationen zum Deklarieren von Konfigurationen auf Anwendungsebene und zum Überschreiben von Konfigurationen während der Jobausführung finden Sie unter. Standardanwendungskonfiguration für Serverless EMR

Optimierung der dynamischen Ressourcenzuweisung in Spark

Wird verwendetdynamicAllocationOptimization, um die Ressourcennutzung in EMR Serverless zu optimieren. Wenn Sie diese Eigenschaft true in Ihrer Spark-Konfigurationsklassifizierung auf setzen, bedeutet das, dass EMR Serverless die Ressourcenzuweisung für Executoren optimiert, um die Geschwindigkeit, mit der Spark Executoren anfordert und storniert, besser an die Rate anzupassen, mit der Serverless Worker erstellt und freigibt. EMR Dadurch werden Workers bei EMR Serverless stufenübergreifend optimaler wiederverwendet, was zu niedrigeren Kosten führt, wenn Jobs mit mehreren Stufen ausgeführt werden, während die gleiche Leistung beibehalten wird.

Diese Eigenschaft ist in allen EMR Amazon-Release-Versionen verfügbar.

Im Folgenden finden Sie ein Beispiel für eine Konfigurationsklassifizierung mitdynamicAllocationOptimization.

[ { "Classification": "spark", "Properties": { "dynamicAllocationOptimization": "true" } } ]

Beachten Sie Folgendes, wenn Sie die dynamische Allokationsoptimierung verwenden:

  • Diese Optimierung ist für die Spark-Jobs verfügbar, für die Sie die dynamische Ressourcenzuweisung aktiviert haben.

  • Um eine optimale Kosteneffizienz zu erzielen, empfehlen wir, je nach Arbeitslast eine obere Skalierungsgrenze für Mitarbeiter zu konfigurieren, indem Sie entweder die Einstellung auf Auftragsebene spark.dynamicAllocation.maxExecutors oder die Einstellung für die maximale Kapazität auf Anwendungsebene verwenden.

  • Bei einfacheren Aufträgen werden Sie möglicherweise keine Kostenverbesserung feststellen. Wenn Ihr Job beispielsweise auf einem kleinen Datensatz ausgeführt wird oder die Ausführung in einer Phase abgeschlossen wird, benötigt Spark möglicherweise keine größere Anzahl von Executoren oder mehrere Skalierungsereignisse.

  • Bei Jobs mit einer Sequenz aus einer großen Phase, kleineren Phasen und dann wieder einer großen Phase kann es zu einer Regression der Job-Laufzeit kommen. Da EMR Serverless Ressourcen effizienter nutzt, kann dies dazu führen, dass für größere Phasen weniger Arbeitskräfte zur Verfügung stehen, was zu einer längeren Laufzeit führt.

Eigenschaften von Spark-Jobs

In der folgenden Tabelle sind optionale Spark-Eigenschaften und ihre Standardwerte aufgeführt, die Sie überschreiben können, wenn Sie einen Spark-Job einreichen.

Schlüssel Beschreibung Standardwert
spark.archives Eine durch Kommas getrennte Liste von Archiven, die Spark in das Arbeitsverzeichnis jedes Executors extrahiert. Zu den unterstützten Dateitypen gehören.jar, und. .tar.gz .tgz .zip Um den zu extrahierenden Verzeichnisnamen anzugeben, fügen Sie # nach dem Namen der zu extrahierenden Datei ein. Beispiel, file.zip#directory. NULL
spark.authenticate Option, die die Authentifizierung der internen Verbindungen von Spark aktiviert. TRUE
spark.driver.cores Die Anzahl der Kerne, die der Treiber verwendet. 4
spark.driver.extraJavaOptions Zusätzliche Java-Optionen für den Spark-Treiber. NULL
spark.driver.memory Die Menge an Speicher, die der Treiber verwendet. 14 G
spark.dynamicAllocation.enabled Option, die die dynamische Ressourcenzuweisung aktiviert. Diese Option skaliert die Anzahl der bei der Anwendung registrierten Executoren je nach Arbeitslast nach oben oder unten. TRUE
spark.dynamicAllocation.executorIdleTimeout Die Zeitspanne, für die ein Executor inaktiv bleiben kann, bevor Spark ihn entfernt. Dies gilt nur, wenn Sie die dynamische Zuweisung aktivieren. 60er Jahre
spark.dynamicAllocation.initialExecutors Die anfängliche Anzahl von Executoren, die ausgeführt werden sollen, wenn Sie die dynamische Zuweisung aktivieren. 3
spark.dynamicAllocation.maxExecutors Die Obergrenze für die Anzahl der Executoren, wenn Sie die dynamische Zuweisung aktivieren.

Für 6.10.0 und höher infinity

Für 6.9.0 und niedriger 100

spark.dynamicAllocation.minExecutors Die Untergrenze für die Anzahl der Executoren, wenn Sie die dynamische Zuweisung aktivieren. 0
spark.emr-serverless.allocation.batch.size Die Anzahl der Container, die in jedem Zyklus der Executor-Zuweisung angefordert werden sollen. Zwischen den einzelnen Zuweisungszyklen besteht eine Lücke von einer Sekunde. 20
spark.emr-serverless.driver.disk Die Spark-Treiberdiskette. 20G
spark.emr-serverless.driverEnv.[KEY] Option, die dem Spark-Treiber Umgebungsvariablen hinzufügt. NULL
spark.emr-serverless.executor.disk Die Spark-Executor-Diskette. 20G
spark.emr-serverless.memoryOverheadFactor Legt den Speicheraufwand fest, der dem Speicher des Treiber- und Executor-Containers hinzugefügt werden soll. 0.1
spark.emr-serverless.driver.disk.type Der Festplattentyp, der an den Spark-Treiber angeschlossen ist. Standard
spark.emr-serverless.executor.disk.type Der Festplattentyp, der an Spark-Executoren angeschlossen ist. Standard
spark.executor.cores Die Anzahl der Kerne, die jeder Executor verwendet. 4
spark.executor.extraJavaOptions Zusätzliche Java-Optionen für den Spark-Executor. NULL
spark.executor.instances Die Anzahl der zuzuweisenden Spark-Executor-Container. 3
spark.executor.memory Die Menge an Speicher, die jeder Executor verwendet. 14 G
spark.executorEnv.[KEY] Option, die Umgebungsvariablen zu den Spark-Executoren hinzufügt. NULL
spark.files Eine durch Kommas getrennte Liste von Dateien, die in das Arbeitsverzeichnis jedes Executors verschoben werden sollen. Sie können im Executor mit auf die Dateipfade dieser Dateien zugreifen. SparkFiles.get(fileName) NULL
spark.hadoop.hive.metastore.client.factory.class Die Hive-Metastore-Implementierungsklasse. NULL
spark.jars Zusätzliche JAR-Dateien, die dem Laufzeit-Klassenpfad des Treibers und der Executoren hinzugefügt werden sollen. NULL
spark.network.crypto.enabled Option, die die basierte Verschlüsselung aktiviert. AES RPC Dazu gehört das in Spark 2.2.0 hinzugefügte Authentifizierungsprotokoll. FALSE
spark.sql.warehouse.dir Der Standardspeicherort für verwaltete Datenbanken und Tabellen. Der Wert von $PWD/spark-warehouse
spark.submit.pyFiles Eine kommagetrennte Liste von.zip,, oder .py Dateien.egg, die in die PYTHONPATH für Python-Apps eingefügt werden sollen. NULL

In der folgenden Tabelle sind die Standard-Submit-Parameter von Spark aufgeführt.

Schlüssel Beschreibung Standardwert
archives Eine durch Kommas getrennte Liste von Archiven, die Spark in das Arbeitsverzeichnis jedes Executors extrahiert. NULL
class Die Hauptklasse der Anwendung (für Java- und Scala-Apps). NULL
conf Eine beliebige Spark-Konfigurationseigenschaft. NULL
driver-cores Die Anzahl der Kerne, die der Treiber verwendet. 4
driver-memory Die Menge an Speicher, die der Treiber verwendet. 14 G
executor-cores Die Anzahl der Kerne, die jeder Executor verwendet. 4
executor-memory Die Menge an Speicher, die der Executor verwendet. 14 G
files Eine durch Kommas getrennte Liste von Dateien, die im Arbeitsverzeichnis jedes Executors abgelegt werden sollen. Sie können im Executor mit auf die Dateipfade dieser Dateien zugreifen. SparkFiles.get(fileName) NULL
jars Eine durch Kommas getrennte Liste von JAR-Dateien, die in die Klassenpfade des Treibers und des Executors aufgenommen werden sollen. NULL
num-executors Die Anzahl der Executors, die gestartet werden sollen. 3
py-files Eine kommagetrennte Liste von.zip,, oder .py Dateien.egg, die in Python-Apps platziert werden PYTHONPATH sollen. NULL
verbose Option, die zusätzliche Debug-Ausgaben aktiviert. NULL

Spark-Beispiele

Das folgende Beispiel zeigt, wie Sie mit dem StartJobRun API ein Python-Skript ausführen können. Ein end-to-end Tutorial, das dieses Beispiel verwendet, finden Sie unterErste Schritte mit Amazon EMR Serverless. Weitere Beispiele für die Ausführung von PySpark Jobs und das Hinzufügen von Python-Abhängigkeiten finden Sie im EMRServerless GitHub Samples-Repository.

aws emr-serverless start-job-run \ --application-id application-id \ --execution-role-arn job-role-arn \ --job-driver '{ "sparkSubmit": { "entryPoint": "s3://us-east-1.elasticmapreduce/emr-containers/samples/wordcount/scripts/wordcount.py", "entryPointArguments": ["s3://amzn-s3-demo-destination-bucket/wordcount_output"], "sparkSubmitParameters": "--conf spark.executor.cores=1 --conf spark.executor.memory=4g --conf spark.driver.cores=1 --conf spark.driver.memory=4g --conf spark.executor.instances=1" } }'

Das folgende Beispiel zeigt, wie Sie den verwenden, StartJobRun API um einen Spark JAR auszuführen.

aws emr-serverless start-job-run \ --application-id application-id \ --execution-role-arn job-role-arn \ --job-driver '{ "sparkSubmit": { "entryPoint": "/usr/lib/spark/examples/jars/spark-examples.jar", "entryPointArguments": ["1"], "sparkSubmitParameters": "--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1" } }'