將檔案和 Python 程式庫匯入 Amazon Athena for Apache Spark - Amazon Athena

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

將檔案和 Python 程式庫匯入 Amazon Athena for Apache Spark

本文件提供了如何將檔案和 Python 程式庫匯入 Amazon Athena for Apache Spark 的範例。

考量事項與限制

  • Python 版本 – 目前,Athena for Spark 使用 Python 版本 3.9.16。請注意,Python 套件對 Python 次要版本敏感。

  • Athena 的星火架構 — Athena 火花在ARM64架構上使用 Amazon Linux 2。請注意,某些 Python 程式庫不會為此架構散發二進位檔。

  • 二進位共享物件 (SOs) — 由於該 SparkContext addPyFile方法不會偵測二進位共享物件,因此無法在 Athena for Spark 中使用它來新增依賴於共享物件的 Python 套件。

  • 彈性分散式資料集 (RDDs) — 不RDDs受支援。

  • 資料框架 — 不支援 .foreach 方法。 PySpark DataFrame

範例

這些範例使用下列慣例。

  • 預留位置 Amazon S3 位置 s3://amzn-s3-demo-bucket。將其替換成您自己的 S3 儲存貯體位置。

  • 從 Unix 外殼執行的所有代碼塊都顯示為 directory_name $。 例如,目錄ls中的命令/tmp及其輸出顯示如下:

    /tmp $ ls

    輸出

    file1 file2

匯入用於計算的文字檔案

本節中的範例說明如何在 Athena for Spark 的筆記本中匯入用於計算的文字檔案。

將檔案寫入本機暫時目錄後,將檔案新增至筆記本

下列範例會顯示如何將檔案寫入本機暫存目錄、將其新增至筆記本,然後進行測試。

import os from pyspark import SparkFiles tempdir = '/tmp/' path = os.path.join(tempdir, "test.txt") with open(path, "w") as testFile: _ = testFile.write("5") sc.addFile(path) def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()

輸出

Calculation completed. +---+---+-------+ | _1| _2| col| +---+---+-------+ | 1| a|[aaaaa]| | 2| b|[bbbbb]| +---+---+-------+

從 Amazon S3 匯入檔案

下列範例會示範如何將檔案從 Amazon S3 匯入筆記本中,然後進行測試。

若要將檔案從 Amazon S3 匯入筆記本
  1. 建立一個名為 test.txt 的檔案,其中具有包含值 5 的單行。

  2. 將檔案新增到 Amazon S3 中的儲存貯體。此範例使用位置 s3://amzn-s3-demo-bucket

  3. 使用以下程式碼,將檔案匯入您的筆記本並測試檔案。

    from pyspark import SparkFiles sc.addFile('s3://amzn-s3-demo-bucket/test.txt') def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()

    輸出

    Calculation completed. +---+---+-------+ | _1| _2| col| +---+---+-------+ | 1| a|[aaaaa]| | 2| b|[bbbbb]| +---+---+-------+

新增 Python 檔案

本節中的範例說明如何將 Python 檔案和程式庫新增到 Athena 中的 Spark 筆記本。

添加 Python 文件並註冊 UDF

下列範例會示範如何將 Python 檔案從 Amazon S3 新增至您的筆記型電腦並註冊UDF.

若要將 Python 檔案新增至您的記事本並註冊 UDF
  1. 使用您自己的 Amazon S3 位置,建立包含下列內容的檔案 s3://amzn-s3-demo-bucket/file1.py

    def xyz(input): return 'xyz - udf ' + str(input);
  2. 在同一個 S3 位置中,建立包含下列內容的檔案 s3://amzn-s3-demo-bucket/file2.py

    from file1 import xyz def uvw(input): return 'uvw -> ' + xyz(input);
  3. 在 Athena for Spark 筆記本中,執行下列命令。

    sc.addPyFile('s3://amzn-s3-demo-bucket/file1.py') sc.addPyFile('s3://amzn-s3-demo-bucket/file2.py') def func(iterator): from file2 import uvw return [uvw(x) for x in iterator] from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show(10)

    輸出

    Calculation started (calculation_id=1ec09e01-3dec-a096-00ea-57289cdb8ce7) in (session=c8c09e00-6f20-41e5-98bd-4024913d6cee). Checking calculation status... Calculation completed. +---+---+--------------------+ | _1| _2| col| +---+---+--------------------+ | 1 | a|[uvw -> xyz - ud... | | 2 | b|[uvw -> xyz - ud... | +---+---+--------------------+

匯入 Python .zip 檔案

您可以使用 Python addPyFileimport 方法將 Python .zip 檔案匯入您的筆記本。

注意

您匯入至 Athena Spark 的 .zip 檔案可能只包含 Python 套件。例如,不支援包含 C 型檔案的套件。

若要將 Python .zip 檔案匯入您的筆記本
  1. 在您的本機電腦上的桌面目錄中 (如 \tmp),建立名為 moduletest 的目錄。

  2. moduletest 目錄中,建立名為 hello.py 的檔案,內含下列內容:

    def hi(input): return 'hi ' + str(input);
  3. 在同一個目錄中,新增名稱為 __init__.py 的空檔案。

    如果您列出目錄內容,則其現在應如下所示。

    /tmp $ ls moduletest __init__.py hello.py
  4. 使用 zip 命令將兩個模組檔案放入名為 moduletest.zip 的檔案。

    moduletest $ zip -r9 ../moduletest.zip *
  5. .zip 檔案上傳至 Amazon S3 中的儲存貯體。

  6. 使用以下程式碼,將 Python.zip 檔案匯入您的筆記本。

    sc.addPyFile('s3://amzn-s3-demo-bucket/moduletest.zip') from moduletest.hello import hi from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(hi) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", hi_udf(col('_2'))).show()

    輸出

    Calculation started (calculation_id=6ec09e8c-6fe0-4547-5f1b-6b01adb2242c) in (session=dcc09e8c-3f80-9cdc-bfc5-7effa1686b76). Checking calculation status... Calculation completed. +---+---+----+ | _1| _2| col| +---+---+----+ | 1| a|hi a| | 2| b|hi b| +---+---+----+

將 Python 程式庫的兩個版本匯入為單獨模組

下列程式碼範例說明如何從 Amazon S3 中的某個位置新增和匯入兩個不同版本的 Python 程式庫做為兩個獨立的模組。程式碼會從 S3 新增每個程式庫檔案、匯入檔案,然後列印程式庫版本以驗證匯入。

sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_15.zip') sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_17_6.zip') import simplejson_v3_15 print(simplejson_v3_15.__version__)

輸出

3.15.0
import simplejson_v3_17_6 print(simplejson_v3_17_6.__version__)

輸出

3.17.6

從 PyPI 匯入 Python .zip 檔案

此範例使用 pip 命令從 Python Package Index (PyPI) 下載 bpabel/piglatin 專案的 Python .zip 檔案。

若要從 PyPI 匯入 Python .zip 檔案
  1. 在本機桌面上,使用下列命令建立名為 testpiglatin 的目錄並建立虛擬環境。

    /tmp $ mkdir testpiglatin /tmp $ cd testpiglatin testpiglatin $ virtualenv .

    輸出

    created virtual environment CPython3.9.6.final.0-64 in 410ms creator CPython3Posix(dest=/private/tmp/testpiglatin, clear=False, no_vcs_ignore=False, global=False) seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/Users/user1/Library/Application Support/virtualenv) added seed packages: pip==22.0.4, setuptools==62.1.0, wheel==0.37.1 activators BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator
  2. 建立名為 unpacked 的子目錄,以啟動項目。

    testpiglatin $ mkdir unpacked
  3. 使用 pip 命令,將專案安裝到 unpacked 目錄。

    testpiglatin $ bin/pip install -t $PWD/unpacked piglatin

    輸出

    Collecting piglatin Using cached piglatin-1.0.6-py2.py3-none-any.whl (3.1 kB) Installing collected packages: piglatin Successfully installed piglatin-1.0.6
  4. 檢查目錄的內容。

    testpiglatin $ ls

    輸出

    bin lib pyvenv.cfg unpacked
  5. 變更為 unpacked 目錄並顯示內容。

    testpiglatin $ cd unpacked unpacked $ ls

    輸出

    piglatin piglatin-1.0.6.dist-info
  6. 使用 zip 命令,將 piglatin 專案的內容放入名為 library.zip 的檔案。

    unpacked $ zip -r9 ../library.zip *

    輸出

    adding: piglatin/ (stored 0%) adding: piglatin/__init__.py (deflated 56%) adding: piglatin/__pycache__/ (stored 0%) adding: piglatin/__pycache__/__init__.cpython-39.pyc (deflated 31%) adding: piglatin-1.0.6.dist-info/ (stored 0%) adding: piglatin-1.0.6.dist-info/RECORD (deflated 39%) adding: piglatin-1.0.6.dist-info/LICENSE (deflated 41%) adding: piglatin-1.0.6.dist-info/WHEEL (deflated 15%) adding: piglatin-1.0.6.dist-info/REQUESTED (stored 0%) adding: piglatin-1.0.6.dist-info/INSTALLER (stored 0%) adding: piglatin-1.0.6.dist-info/METADATA (deflated 48%)
  7. (選用) 使用以下命令在本機測試匯入。

    1. 將 Python 路徑設定為 library.zip 檔案位置並啟動 Python。

      /home $ PYTHONPATH=/tmp/testpiglatin/library.zip /home $ python3

      輸出

      Python 3.9.6 (default, Jun 29 2021, 06:20:32) [Clang 12.0.0 (clang-1200.0.32.29)] on darwin Type "help", "copyright", "credits" or "license" for more information.
    2. 匯入程式庫並執行測試命令。

      >>> import piglatin >>> piglatin.translate('hello')

      輸出

      'ello-hay'
  8. 使用如下命令,從 Amazon S3 新增 .zip 檔案,將檔案匯入 Athena 中的筆記本,然後對其進行測試。

    sc.addPyFile('s3://amzn-s3-demo-bucket/library.zip') import piglatin piglatin.translate('hello') from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(piglatin.translate) df = spark.createDataFrame([(1, "hello"), (2, "world")]) df.withColumn("col", hi_udf(col('_2'))).show()

    輸出

    Calculation started (calculation_id=e2c0a06e-f45d-d96d-9b8c-ff6a58b2a525) in (session=82c0a06d-d60e-8c66-5d12-23bcd55a6457). Checking calculation status... Calculation completed. +---+-----+--------+ | _1| _2| col| +---+-----+--------+ | 1|hello|ello-hay| | 2|world|orld-way| +---+-----+--------+

從具有相依性的 PyPI 匯入 Python .zip 檔案

此範例從 PyPI 匯入 md2gemini 套件,其中該套件會將 Markdown 格式的文字轉換為 Gemini 文字格式。套件具有以下相依性

cjkwrap mistune wcwidth
若要匯入具有相依性的 Python .zip 檔案
  1. 在本機電腦上,使用下列命令建立名為 testmd2gemini 的目錄並建立虛擬環境。

    /tmp $ mkdir testmd2gemini /tmp $ cd testmd2gemini testmd2gemini$ virtualenv .
  2. 建立名為 unpacked 的子目錄,以啟動項目。

    testmd2gemini $ mkdir unpacked
  3. 使用 pip 命令,將專案安裝到 unpacked 目錄。

    /testmd2gemini $ bin/pip install -t $PWD/unpacked md2gemini

    輸出

    Collecting md2gemini Downloading md2gemini-1.9.0-py3-none-any.whl (31 kB) Collecting wcwidth Downloading wcwidth-0.2.5-py2.py3-none-any.whl (30 kB) Collecting mistune<3,>=2.0.0 Downloading mistune-2.0.2-py2.py3-none-any.whl (24 kB) Collecting cjkwrap Downloading CJKwrap-2.2-py2.py3-none-any.whl (4.3 kB) Installing collected packages: wcwidth, mistune, cjkwrap, md2gemini Successfully installed cjkwrap-2.2 md2gemini-1.9.0 mistune-2.0.2 wcwidth-0.2.5 ...
  4. 變更為 unpacked 目錄並檢查內容。

    testmd2gemini $ cd unpacked unpacked $ ls -lah

    輸出

    total 16 drwxr-xr-x 13 user1 wheel 416B Jun 7 18:43 . drwxr-xr-x 8 user1 wheel 256B Jun 7 18:44 .. drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 CJKwrap-2.2.dist-info drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 __pycache__ drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 bin -rw-r--r-- 1 user1 staff 5.0K Jun 7 18:43 cjkwrap.py drwxr-xr-x 7 user1 staff 224B Jun 7 18:43 md2gemini drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 md2gemini-1.9.0.dist-info drwxr-xr-x 12 user1 staff 384B Jun 7 18:43 mistune drwxr-xr-x 8 user1 staff 256B Jun 7 18:43 mistune-2.0.2.dist-info drwxr-xr-x 16 user1 staff 512B Jun 7 18:43 tests drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 wcwidth drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 wcwidth-0.2.5.dist-info
  5. 使用 zip 命令,將 md2gemini 專案的內容放入名為 md2gemini.zip 的檔案。

    unpacked $ zip -r9 ../md2gemini *

    輸出

    adding: CJKwrap-2.2.dist-info/ (stored 0%) adding: CJKwrap-2.2.dist-info/RECORD (deflated 37%) .... adding: wcwidth-0.2.5.dist-info/INSTALLER (stored 0%) adding: wcwidth-0.2.5.dist-info/METADATA (deflated 62%)
  6. (選用) 使用下列指令來測試程式庫是否可在您的本機電腦上運作。

    1. 將 Python 路徑設定為 md2gemini.zip 檔案位置並啟動 Python。

      /home $ PYTHONPATH=/tmp/testmd2gemini/md2gemini.zip /home python3
    2. 匯入程式庫並執行測試。

      >>> from md2gemini import md2gemini >>> print(md2gemini('[abc](https://abc.def)'))

      輸出

      https://abc.def abc
  7. 使用下列命令從 Amazon S3 新增.zip檔案、將檔案匯入 Athena 的筆記型電腦,然後執行非UDF測試。

    # (non udf test) sc.addPyFile('s3://amzn-s3-demo-bucket/md2gemini.zip') from md2gemini import md2gemini print(md2gemini('[abc](https://abc.def)'))

    輸出

    Calculation started (calculation_id=0ac0a082-6c3f-5a8f-eb6e-f8e9a5f9bc44) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. => https://abc.def (https://abc.def/) abc
  8. 使用以下命令執行UDF測試。

    # (udf test) from pyspark.sql.functions import udf from pyspark.sql.functions import col from md2gemini import md2gemini hi_udf = udf(md2gemini) df = spark.createDataFrame([(1, "[first website](https://abc.def)"), (2, "[second website](https://aws.com)")]) df.withColumn("col", hi_udf(col('_2'))).show()

    輸出

    Calculation started (calculation_id=60c0a082-f04d-41c1-a10d-d5d365ef5157) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. +---+--------------------+--------------------+ | _1| _2| col| +---+--------------------+--------------------+ | 1|[first website](h...|=> https://abc.de...| | 2|[second website](...|=> https://aws.co...| +---+--------------------+--------------------+