Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Importer des fichiers et des bibliothèques Python dans Athena pour Spark
Ce document fournit des exemples sur la manière d'importer des fichiers et des bibliothèques Python vers Amazon Athena pour Apache Spark.
Considérations et restrictions
-
Version Python : Athena pour Spark utilise actuellement la version 3.9.16 de Python. Notez que les packages Python sont sensibles aux versions mineures de Python.
-
Athena pour l'architecture Spark — Athena pour Spark utilise Amazon Linux 2 pour l'architecture. ARM64 Notez que certaines bibliothèques Python ne distribuent pas de fichiers binaires pour cette architecture.
-
Objets partagés binaires (SOs) : comme la SparkContext addPyFile
méthode ne détecte pas les objets partagés binaires, elle ne peut pas être utilisée dans Athena for Spark pour ajouter des packages Python qui dépendent d'objets partagés. -
Ensembles de données distribués résilients (RDDs) : ne RDDs
sont pas pris en charge. -
DataFrame.forEach — La méthode .foreach n'est pas prise en PySpark DataFramecharge.
Exemples
Les exemples utilisent les conventions suivantes.
-
Emplacement Amazon S3 réservé
s3://amzn-s3-demo-bucket
. Remplacez-le par l'emplacement de votre propre compartiment S3. -
Tous les blocs de code qui s'exécutent à partir d'un shell Unix sont affichés sous la forme
directory_name
$
. Par exemple, la commandels
du répertoire/tmp
et sa sortie sont affichées comme suit :/tmp $ ls
Sortie
file1 file2
Importation de fichiers texte à utiliser dans les calculs
Les exemples de cette rubrique montrent comment importer des fichiers texte pour les utiliser dans les calculs de vos carnets dans Athena pour Spark.
L'exemple suivant montre comment écrire un fichier dans un répertoire temporaire local, l'ajouter à un bloc-notes et le tester.
import os from pyspark import SparkFiles tempdir = '/tmp/' path = os.path.join(tempdir, "test.txt") with open(path, "w") as testFile: _ = testFile.write("5") sc.addFile(path) def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()
Sortie
Calculation completed.
+---+---+-------+
| _1| _2| col|
+---+---+-------+
| 1| a|[aaaaa]|
| 2| b|[bbbbb]|
+---+---+-------+
L'exemple suivant montre comment importer un fichier à partir d'Amazon S3 dans un bloc-notes et le tester.
Importation d'un fichier à partir d'Amazon S3 dans un bloc-notes
-
Créez un fichier
test.txt
dont le nom comporte une seule ligne contenant la valeur5
. -
Ajoutez le fichier à un compartiment dans Amazon S3. Cet exemple utilise l'emplacement
s3://amzn-s3-demo-bucket
. -
Utilisez le code suivant pour importer le fichier dans votre bloc-notes et le tester.
from pyspark import SparkFiles sc.addFile('s3://amzn-s3-demo-bucket/test.txt') def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()
Sortie
Calculation completed. +---+---+-------+ | _1| _2| col| +---+---+-------+ | 1| a|[aaaaa]| | 2| b|[bbbbb]| +---+---+-------+
Ajouter des fichiers Python
Les exemples de cette rubrique montrent comment ajouter des fichiers et des bibliothèques Python à vos blocs-notes Spark dans Athena.
L'exemple suivant montre comment ajouter des fichiers Python depuis Amazon S3 à votre bloc-notes et enregistrer unUDF.
Pour ajouter des fichiers Python à votre bloc-notes et enregistrer un UDF
-
En utilisant votre propre emplacement Amazon S3, créez le fichier
s3://amzn-s3-demo-bucket/file1.py
avec le contenu suivant :def xyz(input): return 'xyz - udf ' + str(input);
-
Dans le même emplacement S3, créez le fichier
s3://amzn-s3-demo-bucket/file2.py
avec le contenu suivant :from file1 import xyz def uvw(input): return 'uvw -> ' + xyz(input);
-
Dans votre bloc-notes Athena pour Spark, exécutez les commandes suivantes.
sc.addPyFile('s3://amzn-s3-demo-bucket/file1.py') sc.addPyFile('s3://amzn-s3-demo-bucket/file2.py') def func(iterator): from file2 import uvw return [uvw(x) for x in iterator] from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show(10)
Sortie
Calculation started (calculation_id=1ec09e01-3dec-a096-00ea-57289cdb8ce7) in (session=c8c09e00-6f20-41e5-98bd-4024913d6cee). Checking calculation status... Calculation completed. +---+---+--------------------+ | _1| _2| col| +---+---+--------------------+ | 1 | a|[uvw -> xyz - ud... | | 2 | b|[uvw -> xyz - ud... | +---+---+--------------------+
Vous pouvez utiliser Python addPyFile
et ses méthodes import
pour importer un fichier .zip Python dans votre bloc-notes.
Note
Les fichiers .zip
que vous importez dans Athena Spark peuvent inclure uniquement des packages Python. Par exemple, l'inclusion de packages contenant des fichiers basés sur le langage C n'est pas prise en charge.
Pour importer un fichier .zip
Python dans votre bloc-notes
-
Sur votre ordinateur local, dans un répertoire de bureau tel que
\tmp
, créez un répertoire appelémoduletest
. -
Dans le répertoire
moduletest
, créez un fichier nomméhello.py
avec les contenus suivants :def hi(input): return 'hi ' + str(input);
-
Dans le même répertoire, ajoutez un fichier vide portant le nom
__init__.py
.Si vous listez le contenu du répertoire, il devrait maintenant ressembler à ce qui suit.
/tmp $ ls moduletest __init__.py hello.py
-
Utilisez la commande
zip
pour placer les deux fichiers du module dans un fichier appelémoduletest.zip
.moduletest $ zip -r9 ../moduletest.zip *
-
Chargez les fichiers
.zip
dans votre compartiment Amazon S3. -
Utilisez le code suivant pour importer le fichier
.zip
Python dans votre bloc-notes.sc.addPyFile('s3://amzn-s3-demo-bucket/moduletest.zip') from moduletest.hello import hi from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(hi) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", hi_udf(col('_2'))).show()
Sortie
Calculation started (calculation_id=6ec09e8c-6fe0-4547-5f1b-6b01adb2242c) in (session=dcc09e8c-3f80-9cdc-bfc5-7effa1686b76). Checking calculation status... Calculation completed. +---+---+----+ | _1| _2| col| +---+---+----+ | 1| a|hi a| | 2| b|hi b| +---+---+----+
Les exemples de code suivants montrent comment ajouter et importer deux versions différentes d'une bibliothèque Python à partir d'un emplacement dans Amazon S3 en tant que deux modules distincts. Le code ajoute chaque fichier de la bibliothèque à partir de S3, l'importe, puis imprime la version de la bibliothèque pour vérifier l'importation.
sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_15.zip') sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_17_6.zip') import simplejson_v3_15 print(simplejson_v3_15.__version__)
Sortie
3.15.0
import simplejson_v3_17_6 print(simplejson_v3_17_6.__version__)
Sortie
3.17.6
Cet exemple utilise la commande pip
pour télécharger un fichier .zip Python du projet bpabel/piglatin
Importer un fichier .zip Python à partir de PyPI
-
Sur votre bureau local, utilisez les commandes suivantes pour créer un répertoire appelé
testpiglatin
et un environnement virtuel./tmp $ mkdir testpiglatin /tmp $ cd testpiglatin testpiglatin $ virtualenv .
Sortie
created virtual environment CPython3.9.6.final.0-64 in 410ms creator CPython3Posix(dest=/private/tmp/testpiglatin, clear=False, no_vcs_ignore=False, global=False) seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/Users/user1/Library/Application Support/virtualenv) added seed packages: pip==22.0.4, setuptools==62.1.0, wheel==0.37.1 activators BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator
-
Créez un sous-répertoire appelé
unpacked
pour contenir le projet.testpiglatin $ mkdir unpacked
-
Utilisez la commande
pip
pour installer le projet dans le répertoireunpacked
.testpiglatin $ bin/pip install -t $PWD/unpacked piglatin
Sortie
Collecting piglatin Using cached piglatin-1.0.6-py2.py3-none-any.whl (3.1 kB) Installing collected packages: piglatin Successfully installed piglatin-1.0.6
-
Vérifiez le contenu du répertoire.
testpiglatin $ ls
Sortie
bin lib pyvenv.cfg unpacked
-
Accédez au répertoire
unpacked
et affichez le contenu.testpiglatin $ cd unpacked unpacked $ ls
Sortie
piglatin piglatin-1.0.6.dist-info
-
Utilisez la commande
zip
pour placer le contenu du projet piglatin dans un fichier appelélibrary.zip
.unpacked $ zip -r9 ../library.zip *
Sortie
adding: piglatin/ (stored 0%) adding: piglatin/__init__.py (deflated 56%) adding: piglatin/__pycache__/ (stored 0%) adding: piglatin/__pycache__/__init__.cpython-39.pyc (deflated 31%) adding: piglatin-1.0.6.dist-info/ (stored 0%) adding: piglatin-1.0.6.dist-info/RECORD (deflated 39%) adding: piglatin-1.0.6.dist-info/LICENSE (deflated 41%) adding: piglatin-1.0.6.dist-info/WHEEL (deflated 15%) adding: piglatin-1.0.6.dist-info/REQUESTED (stored 0%) adding: piglatin-1.0.6.dist-info/INSTALLER (stored 0%) adding: piglatin-1.0.6.dist-info/METADATA (deflated 48%)
-
(Facultatif) Utilisez les commandes suivantes pour tester l'importation localement.
-
Définissez le chemin Python vers l'emplacement du fichier
library.zip
et démarrez Python./home $ PYTHONPATH=/tmp/testpiglatin/library.zip /home $ python3
Sortie
Python 3.9.6 (default, Jun 29 2021, 06:20:32) [Clang 12.0.0 (clang-1200.0.32.29)] on darwin Type "help", "copyright", "credits" or "license" for more information.
-
Importez la bibliothèque et exécutez une commande de test.
>>> import piglatin >>> piglatin.translate('hello')
Sortie
'ello-hay'
-
-
Utilisez des commandes telles que les suivantes pour ajouter le fichier
.zip
à partir d'Amazon S3, l'importer dans votre bloc-notes dans Athena et le tester.sc.addPyFile('s3://amzn-s3-demo-bucket/library.zip') import piglatin piglatin.translate('hello') from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(piglatin.translate) df = spark.createDataFrame([(1, "hello"), (2, "world")]) df.withColumn("col", hi_udf(col('_2'))).show()
Sortie
Calculation started (calculation_id=e2c0a06e-f45d-d96d-9b8c-ff6a58b2a525) in (session=82c0a06d-d60e-8c66-5d12-23bcd55a6457). Checking calculation status... Calculation completed. +---+-----+--------+ | _1| _2| col| +---+-----+--------+ | 1|hello|ello-hay| | 2|world|orld-way| +---+-----+--------+
Cet exemple importe le package md2gemini
cjkwrap mistune wcwidth
Importation d'un fichier .zip Python comportant des dépendances
-
Sur votre ordinateur local, utilisez les commandes suivantes pour créer un répertoire appelé
testmd2gemini
et un environnement virtuel./tmp $ mkdir testmd2gemini /tmp $ cd testmd2gemini testmd2gemini$ virtualenv .
-
Créez un sous-répertoire appelé
unpacked
pour contenir le projet.testmd2gemini $ mkdir unpacked
-
Utilisez la commande
pip
pour installer le projet dans le répertoireunpacked
./testmd2gemini $ bin/pip install -t $PWD/unpacked md2gemini
Sortie
Collecting md2gemini Downloading md2gemini-1.9.0-py3-none-any.whl (31 kB) Collecting wcwidth Downloading wcwidth-0.2.5-py2.py3-none-any.whl (30 kB) Collecting mistune<3,>=2.0.0 Downloading mistune-2.0.2-py2.py3-none-any.whl (24 kB) Collecting cjkwrap Downloading CJKwrap-2.2-py2.py3-none-any.whl (4.3 kB) Installing collected packages: wcwidth, mistune, cjkwrap, md2gemini Successfully installed cjkwrap-2.2 md2gemini-1.9.0 mistune-2.0.2 wcwidth-0.2.5 ...
-
Accédez au répertoire
unpacked
et vérifiez le contenu.testmd2gemini $ cd unpacked unpacked $ ls -lah
Sortie
total 16 drwxr-xr-x 13 user1 wheel 416B Jun 7 18:43 . drwxr-xr-x 8 user1 wheel 256B Jun 7 18:44 .. drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 CJKwrap-2.2.dist-info drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 __pycache__ drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 bin -rw-r--r-- 1 user1 staff 5.0K Jun 7 18:43 cjkwrap.py drwxr-xr-x 7 user1 staff 224B Jun 7 18:43 md2gemini drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 md2gemini-1.9.0.dist-info drwxr-xr-x 12 user1 staff 384B Jun 7 18:43 mistune drwxr-xr-x 8 user1 staff 256B Jun 7 18:43 mistune-2.0.2.dist-info drwxr-xr-x 16 user1 staff 512B Jun 7 18:43 tests drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 wcwidth drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 wcwidth-0.2.5.dist-info
-
Utilisez la commande
zip
pour placer le contenu du projet md2gemini dans un fichier appelémd2gemini.zip
.unpacked $ zip -r9 ../md2gemini *
Sortie
adding: CJKwrap-2.2.dist-info/ (stored 0%) adding: CJKwrap-2.2.dist-info/RECORD (deflated 37%) .... adding: wcwidth-0.2.5.dist-info/INSTALLER (stored 0%) adding: wcwidth-0.2.5.dist-info/METADATA (deflated 62%)
-
(Facultatif) Utilisez les commandes suivantes pour vérifier que la bibliothèque fonctionne sur votre ordinateur local.
-
Définissez le chemin Python vers l'emplacement du fichier
md2gemini.zip
et démarrez Python./home $ PYTHONPATH=/tmp/testmd2gemini/md2gemini.zip /home python3
-
Importez la bibliothèque et exécutez un test.
>>> from md2gemini import md2gemini >>> print(md2gemini('[abc](https://abc.def)'))
Sortie
https://abc.def abc
-
-
Utilisez les commandes suivantes pour ajouter le
.zip
fichier depuis Amazon S3, l'importer dans votre bloc-notes dans Athena et effectuer un UDF non-test.# (non udf test) sc.addPyFile('s3://amzn-s3-demo-bucket/md2gemini.zip') from md2gemini import md2gemini print(md2gemini('[abc](https://abc.def)'))
Sortie
Calculation started (calculation_id=0ac0a082-6c3f-5a8f-eb6e-f8e9a5f9bc44) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. => https://abc.def (https://abc.def/) abc
-
Utilisez les commandes suivantes pour effectuer un UDF test.
# (udf test) from pyspark.sql.functions import udf from pyspark.sql.functions import col from md2gemini import md2gemini hi_udf = udf(md2gemini) df = spark.createDataFrame([(1, "[first website](https://abc.def)"), (2, "[second website](https://aws.com)")]) df.withColumn("col", hi_udf(col('_2'))).show()
Sortie
Calculation started (calculation_id=60c0a082-f04d-41c1-a10d-d5d365ef5157) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. +---+--------------------+--------------------+ | _1| _2| col| +---+--------------------+--------------------+ | 1|[first website](h...|=> https://abc.de...| | 2|[second website](...|=> https://aws.co...| +---+--------------------+--------------------+