本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
本文件提供了如何將檔案和 Python 程式庫匯入 Amazon Athena for Apache Spark 的範例。
考量事項與限制
-
Python 版本 – 目前,Athena for Spark 使用 Python 版本 3.9.16。請注意,Python 套件對 Python 次要版本敏感。
-
Athena for Spark 架構 – Athena for Spark 在 ARM64 架構上使用 Amazon Linux 2。請注意,某些 Python 程式庫不會為此架構散發二進位檔。
-
二進位共用物件 (SOS) – 因為 SparkContext addPyFile
方法不會偵測二進位共用物件,因此無法在 Athena 中使用它來新增依賴於共用物件的 Python 套件。 -
彈性分散式資料集 (RDD) – 不支援 RDD
。 -
資料框架 – 不支援 PySpark DataFrame.foreach
方法。
範例
這些範例使用下列慣例。
-
預留位置 Amazon S3 位置
s3://amzn-s3-demo-bucket
。將其替換成您自己的 S3 儲存貯體位置。 -
從 Unix Shell 執行的所有程式碼區塊顯示為
directory_name
$
。例如,目錄/tmp
中的命令ls
及其輸出如下所示:/tmp $ ls
輸出
file1 file2
匯入文字檔案以用於計算
本節中的範例說明如何在 Athena for Spark 的筆記本中匯入用於計算的文字檔案。
下列範例會顯示如何將檔案寫入本機暫存目錄、將其新增至筆記本,然後進行測試。
import os
from pyspark import SparkFiles
tempdir = '/tmp/'
path = os.path.join(tempdir, "test.txt")
with open(path, "w") as testFile:
_ = testFile.write("5")
sc.addFile(path)
def func(iterator):
with open(SparkFiles.get("test.txt")) as testFile:
fileVal = int(testFile.readline())
return [x * fileVal for x in iterator]
#Test the file
from pyspark.sql.functions import udf
from pyspark.sql.functions import col
udf_with_import = udf(func)
df = spark.createDataFrame([(1, "a"), (2, "b")])
df.withColumn("col", udf_with_import(col('_2'))).show()
輸出
Calculation completed.
+---+---+-------+
| _1| _2| col|
+---+---+-------+
| 1| a|[aaaaa]|
| 2| b|[bbbbb]|
+---+---+-------+
下列範例會示範如何將檔案從 Amazon S3 匯入筆記本中,然後進行測試。
若要將檔案從 Amazon S3 匯入筆記本
-
建立一個名為
test.txt
的檔案,其中具有包含值5
的單行。 -
將檔案新增到 Amazon S3 中的儲存貯體。此範例使用位置
s3://amzn-s3-demo-bucket
。 -
使用以下程式碼,將檔案匯入您的筆記本並測試檔案。
from pyspark import SparkFiles sc.addFile('s3://amzn-s3-demo-bucket/test.txt') def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()
輸出
Calculation completed. +---+---+-------+ | _1| _2| col| +---+---+-------+ | 1| a|[aaaaa]| | 2| b|[bbbbb]| +---+---+-------+
新增 Python 檔案
本節中的範例說明如何將 Python 檔案和程式庫新增到 Athena 中的 Spark 筆記本。
下列範例會示範如何將 Python 檔案從 Amazon S3 匯入您的筆記本,並註冊 UDF。
若要將 Python 檔案新增至您的筆記本並註冊 UDF
-
使用您自己的 Amazon S3 位置,建立包含下列內容的檔案
s3://amzn-s3-demo-bucket/file1.py
:def xyz(input): return 'xyz - udf ' + str(input);
-
在同一個 S3 位置中,建立包含下列內容的檔案
s3://amzn-s3-demo-bucket/file2.py
:from file1 import xyz def uvw(input): return 'uvw -> ' + xyz(input);
-
在 Athena for Spark 筆記本中,執行下列命令。
sc.addPyFile('s3://amzn-s3-demo-bucket/file1.py') sc.addPyFile('s3://amzn-s3-demo-bucket/file2.py') def func(iterator): from file2 import uvw return [uvw(x) for x in iterator] from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show(10)
輸出
Calculation started (calculation_id=1ec09e01-3dec-a096-00ea-57289cdb8ce7) in (session=c8c09e00-6f20-41e5-98bd-4024913d6cee). Checking calculation status... Calculation completed. +---+---+--------------------+ | _1| _2| col| +---+---+--------------------+ | 1 | a|[uvw -> xyz - ud... | | 2 | b|[uvw -> xyz - ud... | +---+---+--------------------+
您可以使用 Python addPyFile
和 import
方法將 Python .zip 檔案匯入您的筆記本。
注意
您匯入至 Athena Spark 的 .zip
檔案可能只包含 Python 套件。例如,不支援包含 C 型檔案的套件。
若要將 Python .zip
檔案匯入您的筆記本
-
在您的本機電腦上的桌面目錄中 (如
\tmp
),建立名為moduletest
的目錄。 -
在
moduletest
目錄中,建立名為hello.py
的檔案,內含下列內容:def hi(input): return 'hi ' + str(input);
-
在同一個目錄中,新增名稱為
__init__.py
的空檔案。如果您列出目錄內容,則其現在應如下所示。
/tmp $ ls moduletest __init__.py hello.py
-
使用
zip
命令將兩個模組檔案放入名為moduletest.zip
的檔案。moduletest $ zip -r9 ../moduletest.zip *
-
將
.zip
檔案上傳至 Amazon S3 中的儲存貯體。 -
使用以下程式碼,將 Python
.zip
檔案匯入您的筆記本。sc.addPyFile('s3://amzn-s3-demo-bucket/moduletest.zip') from moduletest.hello import hi from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(hi) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", hi_udf(col('_2'))).show()
輸出
Calculation started (calculation_id=6ec09e8c-6fe0-4547-5f1b-6b01adb2242c) in (session=dcc09e8c-3f80-9cdc-bfc5-7effa1686b76). Checking calculation status... Calculation completed. +---+---+----+ | _1| _2| col| +---+---+----+ | 1| a|hi a| | 2| b|hi b| +---+---+----+
下列程式碼範例說明如何從 Amazon S3 中的某個位置新增和匯入兩個不同版本的 Python 程式庫做為兩個獨立的模組。程式碼會從 S3 新增每個程式庫檔案、匯入檔案,然後列印程式庫版本以驗證匯入。
sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_15.zip')
sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_17_6.zip')
import simplejson_v3_15
print(simplejson_v3_15.__version__)
輸出
3.15.0
import simplejson_v3_17_6
print(simplejson_v3_17_6.__version__)
輸出
3.17.6
此範例使用 pip
命令從 Python Package Index (PyPI)
若要從 PyPI 匯入 Python .zip 檔案
-
在本機桌面上,使用下列命令建立名為
testpiglatin
的目錄並建立虛擬環境。/tmp $ mkdir testpiglatin /tmp $ cd testpiglatin testpiglatin $ virtualenv .
輸出
created virtual environment CPython3.9.6.final.0-64 in 410ms creator CPython3Posix(dest=/private/tmp/testpiglatin, clear=False, no_vcs_ignore=False, global=False) seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/Users/user1/Library/Application Support/virtualenv) added seed packages: pip==22.0.4, setuptools==62.1.0, wheel==0.37.1 activators BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator
-
建立名為
unpacked
的子目錄,以啟動項目。testpiglatin $ mkdir unpacked
-
使用
pip
命令,將專案安裝到unpacked
目錄。testpiglatin $ bin/pip install -t $PWD/unpacked piglatin
輸出
Collecting piglatin Using cached piglatin-1.0.6-py2.py3-none-any.whl (3.1 kB) Installing collected packages: piglatin Successfully installed piglatin-1.0.6
-
檢查目錄的內容。
testpiglatin $ ls
輸出
bin lib pyvenv.cfg unpacked
-
變更為
unpacked
目錄並顯示內容。testpiglatin $ cd unpacked unpacked $ ls
輸出
piglatin piglatin-1.0.6.dist-info
-
使用
zip
命令,將 piglatin 專案的內容放入名為library.zip
的檔案。unpacked $ zip -r9 ../library.zip *
輸出
adding: piglatin/ (stored 0%) adding: piglatin/__init__.py (deflated 56%) adding: piglatin/__pycache__/ (stored 0%) adding: piglatin/__pycache__/__init__.cpython-39.pyc (deflated 31%) adding: piglatin-1.0.6.dist-info/ (stored 0%) adding: piglatin-1.0.6.dist-info/RECORD (deflated 39%) adding: piglatin-1.0.6.dist-info/LICENSE (deflated 41%) adding: piglatin-1.0.6.dist-info/WHEEL (deflated 15%) adding: piglatin-1.0.6.dist-info/REQUESTED (stored 0%) adding: piglatin-1.0.6.dist-info/INSTALLER (stored 0%) adding: piglatin-1.0.6.dist-info/METADATA (deflated 48%)
-
(選用) 使用以下命令在本機測試匯入。
-
將 Python 路徑設定為
library.zip
檔案位置並啟動 Python。/home $ PYTHONPATH=/tmp/testpiglatin/library.zip /home $ python3
輸出
Python 3.9.6 (default, Jun 29 2021, 06:20:32) [Clang 12.0.0 (clang-1200.0.32.29)] on darwin Type "help", "copyright", "credits" or "license" for more information.
-
匯入程式庫並執行測試命令。
>>> import piglatin >>> piglatin.translate('hello')
輸出
'ello-hay'
-
-
使用如下命令,從 Amazon S3 新增
.zip
檔案,將檔案匯入 Athena 中的筆記本,然後對其進行測試。sc.addPyFile('s3://amzn-s3-demo-bucket/library.zip') import piglatin piglatin.translate('hello') from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(piglatin.translate) df = spark.createDataFrame([(1, "hello"), (2, "world")]) df.withColumn("col", hi_udf(col('_2'))).show()
輸出
Calculation started (calculation_id=e2c0a06e-f45d-d96d-9b8c-ff6a58b2a525) in (session=82c0a06d-d60e-8c66-5d12-23bcd55a6457). Checking calculation status... Calculation completed. +---+-----+--------+ | _1| _2| col| +---+-----+--------+ | 1|hello|ello-hay| | 2|world|orld-way| +---+-----+--------+
此範例從 PyPI 匯入 md2gemini
cjkwrap mistune wcwidth
若要匯入具有相依性的 Python .zip 檔案
-
在本機電腦上,使用下列命令建立名為
testmd2gemini
的目錄並建立虛擬環境。/tmp $ mkdir testmd2gemini /tmp $ cd testmd2gemini testmd2gemini$ virtualenv .
-
建立名為
unpacked
的子目錄,以啟動項目。testmd2gemini $ mkdir unpacked
-
使用
pip
命令,將專案安裝到unpacked
目錄。/testmd2gemini $ bin/pip install -t $PWD/unpacked md2gemini
輸出
Collecting md2gemini Downloading md2gemini-1.9.0-py3-none-any.whl (31 kB) Collecting wcwidth Downloading wcwidth-0.2.5-py2.py3-none-any.whl (30 kB) Collecting mistune<3,>=2.0.0 Downloading mistune-2.0.2-py2.py3-none-any.whl (24 kB) Collecting cjkwrap Downloading CJKwrap-2.2-py2.py3-none-any.whl (4.3 kB) Installing collected packages: wcwidth, mistune, cjkwrap, md2gemini Successfully installed cjkwrap-2.2 md2gemini-1.9.0 mistune-2.0.2 wcwidth-0.2.5 ...
-
變更為
unpacked
目錄並檢查內容。testmd2gemini $ cd unpacked unpacked $ ls -lah
輸出
total 16 drwxr-xr-x 13 user1 wheel 416B Jun 7 18:43 . drwxr-xr-x 8 user1 wheel 256B Jun 7 18:44 .. drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 CJKwrap-2.2.dist-info drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 __pycache__ drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 bin -rw-r--r-- 1 user1 staff 5.0K Jun 7 18:43 cjkwrap.py drwxr-xr-x 7 user1 staff 224B Jun 7 18:43 md2gemini drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 md2gemini-1.9.0.dist-info drwxr-xr-x 12 user1 staff 384B Jun 7 18:43 mistune drwxr-xr-x 8 user1 staff 256B Jun 7 18:43 mistune-2.0.2.dist-info drwxr-xr-x 16 user1 staff 512B Jun 7 18:43 tests drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 wcwidth drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 wcwidth-0.2.5.dist-info
-
使用
zip
命令,將 md2gemini 專案的內容放入名為md2gemini.zip
的檔案。unpacked $ zip -r9 ../md2gemini *
輸出
adding: CJKwrap-2.2.dist-info/ (stored 0%) adding: CJKwrap-2.2.dist-info/RECORD (deflated 37%) .... adding: wcwidth-0.2.5.dist-info/INSTALLER (stored 0%) adding: wcwidth-0.2.5.dist-info/METADATA (deflated 62%)
-
(選用) 使用下列指令來測試程式庫是否可在您的本機電腦上運作。
-
將 Python 路徑設定為
md2gemini.zip
檔案位置並啟動 Python。/home $ PYTHONPATH=/tmp/testmd2gemini/md2gemini.zip /home python3
-
匯入程式庫並執行測試。
>>> from md2gemini import md2gemini >>> print(md2gemini('[abc](https://abc.def)'))
輸出
https://abc.def abc
-
-
使用以下命令,從 Amazon S3 新增
.zip
檔案,將檔案匯入 Athena 中的筆記本,然後執行 UDF 測試。# (non udf test) sc.addPyFile('s3://amzn-s3-demo-bucket/md2gemini.zip') from md2gemini import md2gemini print(md2gemini('[abc](https://abc.def)'))
輸出
Calculation started (calculation_id=0ac0a082-6c3f-5a8f-eb6e-f8e9a5f9bc44) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. => https://abc.def (https://abc.def/) abc
-
使用下列命令執行 UDF 測試。
# (udf test) from pyspark.sql.functions import udf from pyspark.sql.functions import col from md2gemini import md2gemini hi_udf = udf(md2gemini) df = spark.createDataFrame([(1, "[first website](https://abc.def)"), (2, "[second website](https://aws.com)")]) df.withColumn("col", hi_udf(col('_2'))).show()
輸出
Calculation started (calculation_id=60c0a082-f04d-41c1-a10d-d5d365ef5157) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. +---+--------------------+--------------------+ | _1| _2| col| +---+--------------------+--------------------+ | 1|[first website](h...|=> https://abc.de...| | 2|[second website](...|=> https://aws.co...| +---+--------------------+--------------------+