将文件和 Python 库导入 Athena for Spark
本文档提供有关如何将文件和 Python 库导入 Amazon Athena for Apache Spark 的示例。
注意事项和限制
-
Python 版本 - 目前,Athena for Spark 使用 Python 版本 3.9.16。请注意,Python 包对次要 Python 版本敏感。
-
Athena for Spark 架构 - Athena for Spark 在 ARM64 架构上使用 Amazon Linux 2。请注意,某些 Python 库不会为此架构分发二进制文件。
-
二进制共享对象(SO)- 由于 SparkContext addPyFile
方法检测不到二进制共享对象,因此无法在 Athena for Spark 中用于添加取决于共享对象的 Python 包。 -
弹性分布式数据集(RDD)- 不支持 RDD
。 -
Dataframe.foreach - 不支持 PySpark DataFrame.foreach
方法。
示例
这些示例使用以下约定。
-
占位符 Amazon S3 位置
s3://amzn-s3-demo-bucket
。将该项替换为您自己的 S3 存储桶位置。 -
从 Unix Shell 执行的所有代码块均显示为
directory_name
$
。例如,目录/tmp
中的命令ls
及其输出显示如下:/tmp $ ls
输出
file1 file2
导入文本文件以用于计算
本节中的示例显示如何导入文本文件以用于 Athena for Spark 笔记本中的计算。
以下示例显示如何将文件写入本地临时目录、将其添加到笔记本并对其进行测试。
import os from pyspark import SparkFiles tempdir = '/tmp/' path = os.path.join(tempdir, "test.txt") with open(path, "w") as testFile: _ = testFile.write("5") sc.addFile(path) def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()
输出
Calculation completed.
+---+---+-------+
| _1| _2| col|
+---+---+-------+
| 1| a|[aaaaa]|
| 2| b|[bbbbb]|
+---+---+-------+
以下示例显示如何将文件从 Amazon S3 导入到笔记本中并对其进行测试。
将文件从 Amazon S3 导入到笔记本中
-
创建一个名为
test.txt
的文件,其中有一行包含值5
。 -
将文件添加到 Amazon S3 中的存储桶。此示例使用位置
s3://amzn-s3-demo-bucket
。 -
使用以下代码将文件导入到笔记本中并对其进行测试。
from pyspark import SparkFiles sc.addFile('s3://amzn-s3-demo-bucket/test.txt') def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()
输出
Calculation completed. +---+---+-------+ | _1| _2| col| +---+---+-------+ | 1| a|[aaaaa]| | 2| b|[bbbbb]| +---+---+-------+
添加 Python 文件
本节中的示例显示如何将 Python 文件和库添加到 Athena 中的 Spark 笔记本。
以下示例显示如何将 Python 文件从 Amazon S3 添加到笔记本并注册 UDF。
将 Python 文件添加到笔记本并注册 UDF
-
使用您自己的 Amazon S3 位置创建包含以下内容的
s3://amzn-s3-demo-bucket/file1.py
文件:def xyz(input): return 'xyz - udf ' + str(input);
-
在同一 S3 位置创建包含以下内容的
s3://amzn-s3-demo-bucket/file2.py
文件:from file1 import xyz def uvw(input): return 'uvw -> ' + xyz(input);
-
在 Athena for Spark 笔记本中,运行以下命令。
sc.addPyFile('s3://amzn-s3-demo-bucket/file1.py') sc.addPyFile('s3://amzn-s3-demo-bucket/file2.py') def func(iterator): from file2 import uvw return [uvw(x) for x in iterator] from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show(10)
输出
Calculation started (calculation_id=1ec09e01-3dec-a096-00ea-57289cdb8ce7) in (session=c8c09e00-6f20-41e5-98bd-4024913d6cee). Checking calculation status... Calculation completed. +---+---+--------------------+ | _1| _2| col| +---+---+--------------------+ | 1 | a|[uvw -> xyz - ud... | | 2 | b|[uvw -> xyz - ud... | +---+---+--------------------+
您可以使用 Python addPyFile
和 import
方法,将 Python .zip 文件导入笔记本。
注意
导入 Athena Spark 的 .zip
文件可能仅包含 Python 包。例如,不支持包含基于 C 的文件的包。
将 Python .zip
文件导入笔记本
-
在本地计算机上的桌面目录(例如
\tmp
)中,创建一个名为moduletest
的目录。 -
在
moduletest
目录中,创建一个名为hello.py
的文件,该文件包含以下内容:def hi(input): return 'hi ' + str(input);
-
在同一目录中,添加一个名为
__init__.py
的空文件。如果列出目录内容,则它们应类似于以下内容。
/tmp $ ls moduletest __init__.py hello.py
-
使用
zip
命令将两个模块文件放入名为moduletest.zip
的文件中。moduletest $ zip -r9 ../moduletest.zip *
-
将
.zip
文件上传到 Amazon S3 中的存储桶。 -
使用以下代码将 Python
.zip
文件导入笔记本。sc.addPyFile('s3://amzn-s3-demo-bucket/moduletest.zip') from moduletest.hello import hi from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(hi) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", hi_udf(col('_2'))).show()
输出
Calculation started (calculation_id=6ec09e8c-6fe0-4547-5f1b-6b01adb2242c) in (session=dcc09e8c-3f80-9cdc-bfc5-7effa1686b76). Checking calculation status... Calculation completed. +---+---+----+ | _1| _2| col| +---+---+----+ | 1| a|hi a| | 2| b|hi b| +---+---+----+
以下代码示例显示如何将两个不同版本的 Python 库作为两个单独的模块从 Amazon S3 中的某个位置添加和导入。该代码会从 S3 添加每个库文件,将其导入,然后打印库版本以验证导入。
sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_15.zip') sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_17_6.zip') import simplejson_v3_15 print(simplejson_v3_15.__version__)
输出
3.15.0
import simplejson_v3_17_6 print(simplejson_v3_17_6.__version__)
输出
3.17.6
此示例使用 pip
命令从 Python 程序包索引(PyPI)
从 PyPI 导入 Python .zip 文件
-
在本地桌面上,使用以下命令创建名为
testpiglatin
的目录并创建虚拟环境。/tmp $ mkdir testpiglatin /tmp $ cd testpiglatin testpiglatin $ virtualenv .
输出
created virtual environment CPython3.9.6.final.0-64 in 410ms creator CPython3Posix(dest=/private/tmp/testpiglatin, clear=False, no_vcs_ignore=False, global=False) seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/Users/user1/Library/Application Support/virtualenv) added seed packages: pip==22.0.4, setuptools==62.1.0, wheel==0.37.1 activators BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator
-
创建名为
unpacked
的子目录以保存项目。testpiglatin $ mkdir unpacked
-
使用
pip
命令将该项目安装到unpacked
目录。testpiglatin $ bin/pip install -t $PWD/unpacked piglatin
输出
Collecting piglatin Using cached piglatin-1.0.6-py2.py3-none-any.whl (3.1 kB) Installing collected packages: piglatin Successfully installed piglatin-1.0.6
-
检查目录的内容。
testpiglatin $ ls
输出
bin lib pyvenv.cfg unpacked
-
更改为
unpacked
目录并显示内容。testpiglatin $ cd unpacked unpacked $ ls
输出
piglatin piglatin-1.0.6.dist-info
-
使用
zip
命令将 piglatin 项目的内容放入名为library.zip
的文件中。unpacked $ zip -r9 ../library.zip *
输出
adding: piglatin/ (stored 0%) adding: piglatin/__init__.py (deflated 56%) adding: piglatin/__pycache__/ (stored 0%) adding: piglatin/__pycache__/__init__.cpython-39.pyc (deflated 31%) adding: piglatin-1.0.6.dist-info/ (stored 0%) adding: piglatin-1.0.6.dist-info/RECORD (deflated 39%) adding: piglatin-1.0.6.dist-info/LICENSE (deflated 41%) adding: piglatin-1.0.6.dist-info/WHEEL (deflated 15%) adding: piglatin-1.0.6.dist-info/REQUESTED (stored 0%) adding: piglatin-1.0.6.dist-info/INSTALLER (stored 0%) adding: piglatin-1.0.6.dist-info/METADATA (deflated 48%)
-
(可选)使用以下命令在本地测试导入。
-
将 Python 路径设置为
library.zip
文件位置然后启动 Python。/home $ PYTHONPATH=/tmp/testpiglatin/library.zip /home $ python3
输出
Python 3.9.6 (default, Jun 29 2021, 06:20:32) [Clang 12.0.0 (clang-1200.0.32.29)] on darwin Type "help", "copyright", "credits" or "license" for more information.
-
导入库并运行测试命令。
>>> import piglatin >>> piglatin.translate('hello')
输出
'ello-hay'
-
-
使用以下命令从 Amazon S3 添加
.zip
文件,将其导入 Athena 中的笔记本,然后对其进行测试。sc.addPyFile('s3://amzn-s3-demo-bucket/library.zip') import piglatin piglatin.translate('hello') from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(piglatin.translate) df = spark.createDataFrame([(1, "hello"), (2, "world")]) df.withColumn("col", hi_udf(col('_2'))).show()
输出
Calculation started (calculation_id=e2c0a06e-f45d-d96d-9b8c-ff6a58b2a525) in (session=82c0a06d-d60e-8c66-5d12-23bcd55a6457). Checking calculation status... Calculation completed. +---+-----+--------+ | _1| _2| col| +---+-----+--------+ | 1|hello|ello-hay| | 2|world|orld-way| +---+-----+--------+
此示例从 PyPI 导入 md2gemini
cjkwrap mistune wcwidth
导入具有依赖项的 Python .zip 文件
-
在本地计算机上,使用以下命令创建名为
testmd2gemini
的目录并创建虚拟环境。/tmp $ mkdir testmd2gemini /tmp $ cd testmd2gemini testmd2gemini$ virtualenv .
-
创建名为
unpacked
的子目录以保存项目。testmd2gemini $ mkdir unpacked
-
使用
pip
命令将该项目安装到unpacked
目录。/testmd2gemini $ bin/pip install -t $PWD/unpacked md2gemini
输出
Collecting md2gemini Downloading md2gemini-1.9.0-py3-none-any.whl (31 kB) Collecting wcwidth Downloading wcwidth-0.2.5-py2.py3-none-any.whl (30 kB) Collecting mistune<3,>=2.0.0 Downloading mistune-2.0.2-py2.py3-none-any.whl (24 kB) Collecting cjkwrap Downloading CJKwrap-2.2-py2.py3-none-any.whl (4.3 kB) Installing collected packages: wcwidth, mistune, cjkwrap, md2gemini Successfully installed cjkwrap-2.2 md2gemini-1.9.0 mistune-2.0.2 wcwidth-0.2.5 ...
-
更改为
unpacked
目录并检查内容。testmd2gemini $ cd unpacked unpacked $ ls -lah
输出
total 16 drwxr-xr-x 13 user1 wheel 416B Jun 7 18:43 . drwxr-xr-x 8 user1 wheel 256B Jun 7 18:44 .. drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 CJKwrap-2.2.dist-info drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 __pycache__ drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 bin -rw-r--r-- 1 user1 staff 5.0K Jun 7 18:43 cjkwrap.py drwxr-xr-x 7 user1 staff 224B Jun 7 18:43 md2gemini drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 md2gemini-1.9.0.dist-info drwxr-xr-x 12 user1 staff 384B Jun 7 18:43 mistune drwxr-xr-x 8 user1 staff 256B Jun 7 18:43 mistune-2.0.2.dist-info drwxr-xr-x 16 user1 staff 512B Jun 7 18:43 tests drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 wcwidth drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 wcwidth-0.2.5.dist-info
-
使用
zip
命令将 md2gemini 项目的内容放入名为md2gemini.zip
的文件中。unpacked $ zip -r9 ../md2gemini *
输出
adding: CJKwrap-2.2.dist-info/ (stored 0%) adding: CJKwrap-2.2.dist-info/RECORD (deflated 37%) .... adding: wcwidth-0.2.5.dist-info/INSTALLER (stored 0%) adding: wcwidth-0.2.5.dist-info/METADATA (deflated 62%)
-
(可选)使用以下命令测试库是否可以在您的本地计算机上运行。
-
将 Python 路径设置为
md2gemini.zip
文件位置然后启动 Python。/home $ PYTHONPATH=/tmp/testmd2gemini/md2gemini.zip /home python3
-
导入库并运行测试。
>>> from md2gemini import md2gemini >>> print(md2gemini('[abc](https://abc.def)'))
输出
https://abc.def abc
-
-
使用以下命令从 Amazon S3 添加
.zip
文件,将其导入 Athena 中的笔记本,然后执行非 UDF 测试。# (non udf test) sc.addPyFile('s3://amzn-s3-demo-bucket/md2gemini.zip') from md2gemini import md2gemini print(md2gemini('[abc](https://abc.def)'))
输出
Calculation started (calculation_id=0ac0a082-6c3f-5a8f-eb6e-f8e9a5f9bc44) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. => https://abc.def (https://abc.def/) abc
-
使用以下命令执行 UDF 测试。
# (udf test) from pyspark.sql.functions import udf from pyspark.sql.functions import col from md2gemini import md2gemini hi_udf = udf(md2gemini) df = spark.createDataFrame([(1, "[first website](https://abc.def)"), (2, "[second website](https://aws.com)")]) df.withColumn("col", hi_udf(col('_2'))).show()
输出
Calculation started (calculation_id=60c0a082-f04d-41c1-a10d-d5d365ef5157) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. +---+--------------------+--------------------+ | _1| _2| col| +---+--------------------+--------------------+ | 1|[first website](h...|=> https://abc.de...| | 2|[second website](...|=> https://aws.co...| +---+--------------------+--------------------+