Optimisation des fonctions définies par l'utilisateur -

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Optimisation des fonctions définies par l'utilisateur

Les fonctions définies par l'utilisateur (UDFs) et RDD.map IN PySpark dégradent souvent les performances de manière significative. Cela est dû à la surcharge requise pour représenter avec précision votre code Python dans l'implémentation Scala sous-jacente de Spark.

Le schéma suivant montre l'architecture des PySpark tâches. Lorsque vous l'utilisez PySpark, le pilote Spark utilise la bibliothèque Py4j pour appeler des méthodes Java depuis Python. Lorsque vous appelez Spark SQL ou des fonctions DataFrame intégrées, il y a peu de différence de performance entre Python et Scala, car les fonctions s'exécutent sur chaque exécuteur à JVM l'aide d'un plan d'exécution optimisé.

Le contexte Spark se connecte au pilote Spark à l'aide de Py4J, et le pilote se connecte aux nœuds de travail.

Si vous utilisez votre propre logique Python, telle que usingmap/ mapPartitions/ udf, la tâche s'exécutera dans un environnement d'exécution Python. La gestion de deux environnements entraîne des frais généraux. En outre, vos données en mémoire doivent être transformées pour être utilisées par les fonctions intégrées de l'environnement JVM d'exécution. Pickle est un format de sérialisation utilisé par défaut pour l'échange entre les environnements d'exécution et JVM Python. Cependant, le coût de cette sérialisation et de cette désérialisation étant très élevé, les UDFs écrits en Java ou en Scala sont plus rapides que Python. UDFs

Pour éviter les surcharges liées à la sérialisation et à la désérialisation PySpark, tenez compte des points suivants :

  • Utilisez les SQL fonctions Spark intégrées : envisagez de remplacer votre propre fonction UDF ou votre fonction cartographique par Spark SQL ou des fonctions DataFrame intégrées. Lors de l'exécution de Spark SQL ou de fonctions DataFrame intégrées, il y a peu de différence de performance entre Python et Scala, car les tâches sont gérées par chaque exécuteur. JVM

  • UDFsImplémenter en Scala ou Java — Envisagez d'utiliser un UDF fichier écrit en Java ou en Scala, car ils s'exécutent sur le. JVM

  • Utiliser la technologie basée sur Apache Arrow UDFs pour les charges de travail vectorisées : pensez à utiliser la technologie basée sur Arrow. UDFs Cette fonctionnalité est également connue sous le nom de vectorisation UDF (PandasUDF). Apache Arrow est un format de données en mémoire indépendant du langage qui AWS Glue peut être utilisé pour transférer efficacement des données entre et JVM des processus Python. C'est actuellement le plus avantageux pour les utilisateurs de Python qui travaillent avec des pandas ou NumPy des données.

    La flèche est un format colonnaire (vectorisé). Son utilisation n'est pas automatique et peut nécessiter quelques modifications mineures de la configuration ou du code pour en tirer pleinement parti et garantir la compatibilité. Pour plus de détails et pour connaître les limites, voir Apache Arrow dans PySpark.

    L'exemple suivant compare un incrémental de base UDF en Python standard, en tant que vectorisé et en UDF Spark. SQL

Python standard UDF

Le temps d'exemple est de 3,20 (sec).

Exemple de code

# DataSet df = spark.range(10000000).selectExpr("id AS a","id AS b") # UDF Example def plus(a,b): return a+b spark.udf.register("plus",plus) df.selectExpr("count(plus(a,b))").collect()

Plan d'exécution

== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[], functions=[count(pythonUDF0#124)]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#580] +- HashAggregate(keys=[], functions=[partial_count(pythonUDF0#124)]) +- Project [pythonUDF0#124] +- BatchEvalPython [plus(a#116L, b#117L)], [pythonUDF0#124] +- Project [id#114L AS a#116L, id#114L AS b#117L] +- Range (0, 10000000, step=1, splits=16)

Vectorisé UDF

Le temps d'exemple est de 0,59 (sec).

Le vectorisé UDF est 5 fois plus rapide que l'exemple précédentUDF. Vous pouvez voir ArrowEvalPython que cette application est vectorisée par Apache Arrow. Physical Plan Pour activer VectorizedUDF, vous devez le spécifier spark.sql.execution.arrow.pyspark.enabled = true dans votre code.

Exemple de code

# Vectorized UDF from pyspark.sql.types import LongType from pyspark.sql.functions import count, pandas_udf # Enable Apache Arrow Support spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") # DataSet df = spark.range(10000000).selectExpr("id AS a","id AS b") # Annotate pandas_udf to use Vectorized UDF @pandas_udf(LongType()) def pandas_plus(a,b): return a+b spark.udf.register("pandas_plus",pandas_plus) df.selectExpr("count(pandas_plus(a,b))").collect()

Plan d'exécution

== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[], functions=[count(pythonUDF0#1082L)], output=[count(pandas_plus(a, b))#1080L]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#5985] +- HashAggregate(keys=[], functions=[partial_count(pythonUDF0#1082L)], output=[count#1084L]) +- Project [pythonUDF0#1082L] +- ArrowEvalPython [pandas_plus(a#1074L, b#1075L)], [pythonUDF0#1082L], 200 +- Project [id#1072L AS a#1074L, id#1072L AS b#1075L] +- Range (0, 10000000, step=1, splits=16)

Étincelle SQL

Le temps d'exemple est de 0,087 (sec).

Spark SQL est beaucoup plus rapide que VectorizedUDF, car les tâches sont exécutées sur chaque exécuteur sans environnement JVM d'exécution Python. Si vous pouvez le remplacer par une fonction intégrée, nous vous recommandons de le faire. UDF

Exemple de code

df.createOrReplaceTempView("test") spark.sql("select count(a+b) from test").collect()

Utiliser les pandas pour les mégadonnées

Si vous connaissez déjà les pandas et que vous souhaitez utiliser Spark pour le Big Data, vous pouvez utiliser les pandas API sur Spark. AWS Glue Les versions 4.0 et ultérieures le supportent. Pour commencer, vous pouvez utiliser le bloc-notes officiel Quickstart : Pandas API on Spark. Pour plus d'informations, consultez la PySpark documentation.