Optimieren Sie benutzerdefinierte Funktionen -

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Optimieren Sie benutzerdefinierte Funktionen

Benutzerdefinierte Funktionen (UDFs) und RDD.map in beeinträchtigen die Leistung PySpark häufig erheblich. Dies liegt an dem Aufwand, der erforderlich ist, um Ihren Python-Code in der Scala-Implementierung, die Spark zugrunde liegt, korrekt darzustellen.

Das folgende Diagramm zeigt die Architektur von PySpark Jobs. Wenn Sie verwenden PySpark, verwendet der Spark-Treiber die Py4j-Bibliothek, um Java-Methoden von Python aus aufzurufen. Beim Aufrufen von Spark SQL oder DataFrame integrierten Funktionen gibt es kaum Leistungsunterschiede zwischen Python und Scala, da die Funktionen auf jedem Executor JVM mit einem optimierten Ausführungsplan ausgeführt werden.

Der Spark-Kontext stellt über Py4J eine Verbindung zum Spark-Treiber her, und der Treiber stellt eine Verbindung zu den Worker-Knoten her.

Wenn Sie Ihre eigene Python-Logik verwenden, z. B. usingmap/ mapPartitions/ udf, wird die Aufgabe in einer Python-Laufzeitumgebung ausgeführt. Die Verwaltung von zwei Umgebungen verursacht Gemeinkosten. Darüber hinaus müssen Ihre Daten im Speicher transformiert werden, damit sie von den integrierten Funktionen der JVM Laufzeitumgebung verwendet werden können. Pickle ist ein Serialisierungsformat, das standardmäßig für den Austausch zwischen der JVM und der Python-Laufzeit verwendet wird. Die Kosten für diese Serialisierung und Deserialisierung sind jedoch sehr hoch, sodass in Java oder Scala UDFs geschriebene Versionen schneller sind als in Python. UDFs

Beachten Sie Folgendes, um den Aufwand für Serialisierung und Deserialisierung zu vermeiden: PySpark

  • Verwenden Sie die integrierten SQL Spark-Funktionen — Erwägen Sie, Ihre eigene Funktion UDF oder Map-Funktion durch Spark SQL oder DataFrame integrierte Funktionen zu ersetzen. Bei der Ausführung von Spark SQL oder DataFrame integrierten Funktionen gibt es kaum Leistungsunterschiede zwischen Python und Scala, da die Aufgaben von jedem Executor ausgeführt werden. JVM

  • Implementieren Sie UDFs in Scala oder Java — Erwägen Sie die Verwendung von aUDF, die in Java oder Scala geschrieben ist, da sie auf dem ausgeführt werden. JVM

  • Verwenden Sie Apache Arrow-based UDFs für vektorisierte Workloads — Erwägen Sie die Verwendung von Arrow-based. UDFs Diese Funktion wird auch als Vectorized (Pandas) bezeichnet. UDF UDF Apache Arrow ist ein sprachunabhängiges In-Memory-Datenformat, mit dem Daten effizient AWS Glue zwischen Python-Prozessen übertragen werden können. JVM Dies ist derzeit für Python-Benutzer, die mit Pandas oder NumPy Daten arbeiten, am vorteilhaftesten.

    Arrow ist ein spaltenförmiges (vektorisiertes) Format. Die Verwendung erfolgt nicht automatisch und erfordert möglicherweise einige geringfügige Änderungen an der Konfiguration oder am Code, um alle Vorteile zu nutzen und die Kompatibilität sicherzustellen. Weitere Informationen und Einschränkungen finden Sie unter Apache Arrow unter PySpark.

    Im folgenden Beispiel wird eine einfache inkrementelle Version UDF in Standard-Python, als Vectorized UDF und in Spark verglichen. SQL

Standardpython UDF

Die Beispielzeit beträgt 3,20 (Sekunden).

Beispiel-Code

# DataSet df = spark.range(10000000).selectExpr("id AS a","id AS b") # UDF Example def plus(a,b): return a+b spark.udf.register("plus",plus) df.selectExpr("count(plus(a,b))").collect()

Ausführungsplan

== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[], functions=[count(pythonUDF0#124)]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#580] +- HashAggregate(keys=[], functions=[partial_count(pythonUDF0#124)]) +- Project [pythonUDF0#124] +- BatchEvalPython [plus(a#116L, b#117L)], [pythonUDF0#124] +- Project [id#114L AS a#116L, id#114L AS b#117L] +- Range (0, 10000000, step=1, splits=16)

Vektorisiert UDF

Die Beispielzeit beträgt 0,59 (Sekunden).

Vectorized UDF ist fünfmal schneller als das vorherige Beispiel. UDF Wenn Sie Physical Plan das überprüfen, können Sie sehenArrowEvalPython, was zeigt, dass diese Anwendung von Apache Arrow vektorisiert wurde. Um Vectorized zu aktivierenUDF, müssen Sie in Ihrem Code Folgendes angebenspark.sql.execution.arrow.pyspark.enabled = true.

Beispiel-Code

# Vectorized UDF from pyspark.sql.types import LongType from pyspark.sql.functions import count, pandas_udf # Enable Apache Arrow Support spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") # DataSet df = spark.range(10000000).selectExpr("id AS a","id AS b") # Annotate pandas_udf to use Vectorized UDF @pandas_udf(LongType()) def pandas_plus(a,b): return a+b spark.udf.register("pandas_plus",pandas_plus) df.selectExpr("count(pandas_plus(a,b))").collect()

Ausführungsplan

== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[], functions=[count(pythonUDF0#1082L)], output=[count(pandas_plus(a, b))#1080L]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#5985] +- HashAggregate(keys=[], functions=[partial_count(pythonUDF0#1082L)], output=[count#1084L]) +- Project [pythonUDF0#1082L] +- ArrowEvalPython [pandas_plus(a#1074L, b#1075L)], [pythonUDF0#1082L], 200 +- Project [id#1072L AS a#1074L, id#1072L AS b#1075L] +- Range (0, 10000000, step=1, splits=16)

Funke SQL

Die Beispielzeit beträgt 0,087 (Sekunden).

Spark SQL ist viel schneller als VectorizedUDF, da die Aufgaben auf jedem Executor JVM ohne Python-Laufzeit ausgeführt werden. Wenn Sie Ihre UDF durch eine integrierte Funktion ersetzen können, empfehlen wir, dies zu tun.

Beispiel-Code

df.createOrReplaceTempView("test") spark.sql("select count(a+b) from test").collect()

Pandas für Big Data verwenden

Wenn Sie bereits mit Pandas vertraut sind und Spark für Big Data verwenden möchten, können Sie die Pandas API auf Spark verwenden. AWS Glue 4.0 und höher unterstützen es. Um loszulegen, können Sie das offizielle Notizbuch Quickstart: Pandas API on Spark verwenden. Weitere Informationen finden Sie in der PySpark Dokumentation.