Importieren von Dateien und Python-Bibliotheken in Amazon Athena für Apache Spark - Amazon Athena

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Importieren von Dateien und Python-Bibliotheken in Amazon Athena für Apache Spark

Dieses Dokument enthält Beispiele für den Import von Dateien und Python-Bibliotheken in Amazon Athena für Apache Spark.

Überlegungen und Einschränkungen

  • Python-Version – Derzeit verwendet Athena für Spark die Python-Version 3.9.16. Beachten Sie, dass Python-Pakete empfindlich auf Python-Nebenversionen reagieren.

  • Athena for Spark-Architektur — Athena for Spark verwendet Amazon Linux 2 für die Architektur. ARM64 Beachten Sie, dass einige Python-Bibliotheken keine Binärdateien für diese Architektur verteilen.

  • Binäre gemeinsame Objekte (SOs) — Da die SparkContext addPyFileMethode keine binären gemeinsamen Objekte erkennt, kann sie in Athena for Spark nicht verwendet werden, um Python-Pakete hinzuzufügen, die von gemeinsamen Objekten abhängen.

  • Resilient Distributed Datasets (RDDs)RDDswerden nicht unterstützt.

  • DataFrame.forEach — Die PySpark DataFrame .foreach-Methode wird nicht unterstützt.

Beispiele

In den Beispielen werden die folgenden Konventionen verwendet.

  • Der Platzhalter Amazon-S3-Speicherort s3://amzn-s3-demo-bucket. Ersetzen Sie dies durch Ihren eigenen S3-Bucket-Speicherort.

  • Alle Codeblöcke, die von einer Unix-Shell aus ausgeführt werden, werden angezeigt als directory_name $. Der Befehl ls im Verzeichnis /tmp und seine Ausgabe werden beispielsweise wie folgt angezeigt:

    /tmp $ ls

    Ausgabe

    file1 file2

Importieren von Textdateien zur Verwendung in Berechnungen

Die Beispiele in diesem Abschnitt veranschaulichen, wie Sie Textarchive für Berechnungen in Ihren Notebooks in Athena für Spark importieren.

Hinzufügen einer Datei zu einem Notebook nach dem Schreiben in das lokale temporäre Verzeichnis

Das folgende Beispiel zeigt, wie eine Datei in ein lokales temporäres Verzeichnis geschrieben, einem Notebook hinzugefügt und getestet wird.

import os from pyspark import SparkFiles tempdir = '/tmp/' path = os.path.join(tempdir, "test.txt") with open(path, "w") as testFile: _ = testFile.write("5") sc.addFile(path) def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()

Ausgabe

Calculation completed. +---+---+-------+ | _1| _2| col| +---+---+-------+ | 1| a|[aaaaa]| | 2| b|[bbbbb]| +---+---+-------+

Importieren einer Datei aus Amazon S3

Das folgende Beispiel zeigt, wie Sie eine Datei aus Amazon S3 in ein Notebook importieren und testen.

Importieren einer Datei aus Amazon S3 in ein Notebook
  1. Erstellen Sie eine Datei mit dem Namen test.txt, die eine einzelne Zeile mit dem Wert 5 enthält.

  2. Fügen Sie die Datei einem Bucket in Amazon S3 hinzu. In diesem Beispiel wird der Speicherort s3://amzn-s3-demo-bucket verwendet.

  3. Verwenden Sie den folgenden Code, um die Datei in Ihr Notebook zu importieren und die Datei zu testen.

    from pyspark import SparkFiles sc.addFile('s3://amzn-s3-demo-bucket/test.txt') def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()

    Ausgabe

    Calculation completed. +---+---+-------+ | _1| _2| col| +---+---+-------+ | 1| a|[aaaaa]| | 2| b|[bbbbb]| +---+---+-------+

Hinzufügen von Python-Dateien

Die Beispiele in diesem Abschnitt zeigen, wie Sie Ihren Spark-Notebooks in Athena-Python-Dateien und -Bibliotheken hinzufügen.

Python-Dateien hinzufügen und registrieren UDF

Das folgende Beispiel zeigt, wie Sie Python-Dateien von Amazon S3 zu Ihrem Notizbuch hinzufügen und eine registrierenUDF.

Um Python-Dateien zu Ihrem Notizbuch hinzuzufügen und eine zu registrieren UDF
  1. Verwenden Sie Ihren eigenen Amazon-S3-Speicherort und erstellen Sie die Datei s3://amzn-s3-demo-bucket/file1.py mit dem folgenden Inhalt:

    def xyz(input): return 'xyz - udf ' + str(input);
  2. Erstellen Sie am selben S3-Speicherort die Datei s3://amzn-s3-demo-bucket/file2.py mit dem folgenden Inhalt:

    from file1 import xyz def uvw(input): return 'uvw -> ' + xyz(input);
  3. Führen Sie in Ihrem Athena-for-Spark-Notebook die folgenden Befehle aus.

    sc.addPyFile('s3://amzn-s3-demo-bucket/file1.py') sc.addPyFile('s3://amzn-s3-demo-bucket/file2.py') def func(iterator): from file2 import uvw return [uvw(x) for x in iterator] from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show(10)

    Ausgabe

    Calculation started (calculation_id=1ec09e01-3dec-a096-00ea-57289cdb8ce7) in (session=c8c09e00-6f20-41e5-98bd-4024913d6cee). Checking calculation status... Calculation completed. +---+---+--------------------+ | _1| _2| col| +---+---+--------------------+ | 1 | a|[uvw -> xyz - ud... | | 2 | b|[uvw -> xyz - ud... | +---+---+--------------------+

Importieren einer Python-ZIP-Datei

Sie können die Python-Methoden addPyFile und import verwenden, um eine Python-ZIP-Datei in Ihr Notebook zu importieren.

Anmerkung

Die .zip-Dateien, die Sie in Athena Spark importieren, enthalten möglicherweise nur Python-Pakete. Beispielsweise wird das Einbeziehen von Paketen mit C-basierten Dateien nicht unterstützt.

So importieren Sie eine .zip-Python-Datei in Ihr Notebook
  1. Erstellen Sie auf Ihrem lokalen Computer in einem Desktop-Verzeichnis wie z. B. \tmp ein Verzeichnis mit dem Namen moduletest.

  2. Erstellen Sie im Verzeichnis moduletest eine Datei namens hello.py mit folgendem Inhalt:

    def hi(input): return 'hi ' + str(input);
  3. Fügen Sie im selben Verzeichnis eine leere Datei mit dem Namen __init__.py hinzu.

    Wenn Sie die Verzeichnisinhalte auflisten, sollten diese jetzt wie folgt aussehen.

    /tmp $ ls moduletest __init__.py hello.py
  4. Verwenden Sie den zip-Befehl, um die beiden Moduldateien in einer Datei mit dem Namen moduletest.zip zu platzieren.

    moduletest $ zip -r9 ../moduletest.zip *
  5. Laden Sie die .zip-Datei in Ihren Bucket in Amazon S3 hoch.

  6. Verwenden Sie den folgenden Code, um die .zip-Python-Datei in Ihr Notebook zu importieren.

    sc.addPyFile('s3://amzn-s3-demo-bucket/moduletest.zip') from moduletest.hello import hi from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(hi) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", hi_udf(col('_2'))).show()

    Ausgabe

    Calculation started (calculation_id=6ec09e8c-6fe0-4547-5f1b-6b01adb2242c) in (session=dcc09e8c-3f80-9cdc-bfc5-7effa1686b76). Checking calculation status... Calculation completed. +---+---+----+ | _1| _2| col| +---+---+----+ | 1| a|hi a| | 2| b|hi b| +---+---+----+

Importieren von zwei Versionen einer Python-Bibliothek als separate Module

Die folgenden Codebeispiele zeigen, wie Sie zwei verschiedene Versionen einer Python-Bibliothek von einem Speicherort in Amazon S3 als zwei separate Module hinzufügen und importieren. Der Code fügt jeweils die Bibliotheksdatei aus S3 hinzu, importiert sie und gibt dann die Bibliotheksversion aus, um den Import zu überprüfen.

sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_15.zip') sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_17_6.zip') import simplejson_v3_15 print(simplejson_v3_15.__version__)

Ausgabe

3.15.0
import simplejson_v3_17_6 print(simplejson_v3_17_6.__version__)

Ausgabe

3.17.6

Importieren einer Python-ZIP-Datei aus PyPI

In diesem Beispiel wird der pip-Befehl verwendet, um eine Python-ZIP-Datei des Projekts bpabel/piglatin aus dem Python Package Index (PyPI) herunterzuladen.

So importieren Sie eine Python-ZIP-Datei aus PyPI
  1. Verwenden Sie auf Ihrem lokalen Desktop die folgenden Befehle, um ein Verzeichnis mit dem Namen testpiglatin zu erstellen und eine virtuelle Umgebung zu erstellen.

    /tmp $ mkdir testpiglatin /tmp $ cd testpiglatin testpiglatin $ virtualenv .

    Ausgabe

    created virtual environment CPython3.9.6.final.0-64 in 410ms creator CPython3Posix(dest=/private/tmp/testpiglatin, clear=False, no_vcs_ignore=False, global=False) seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/Users/user1/Library/Application Support/virtualenv) added seed packages: pip==22.0.4, setuptools==62.1.0, wheel==0.37.1 activators BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator
  2. Erstellen Sie ein Unterverzeichnis mit dem Namen unpacked, um das Projekt zu speichern.

    testpiglatin $ mkdir unpacked
  3. Verwenden Sie den pip-Befehl, um das Projekt in das unpacked-Verzeichnis zu installieren.

    testpiglatin $ bin/pip install -t $PWD/unpacked piglatin

    Ausgabe

    Collecting piglatin Using cached piglatin-1.0.6-py2.py3-none-any.whl (3.1 kB) Installing collected packages: piglatin Successfully installed piglatin-1.0.6
  4. Überprüfen Sie den Inhalt des Verzeichnisses.

    testpiglatin $ ls

    Ausgabe

    bin lib pyvenv.cfg unpacked
  5. Wechseln Sie in das unpacked-Verzeichnis und zeigen Sie den Inhalt an.

    testpiglatin $ cd unpacked unpacked $ ls

    Ausgabe

    piglatin piglatin-1.0.6.dist-info
  6. Verwenden Sie den zip-Befehl, um den Inhalt des Piglatin-Projekts in einer Datei mit dem Namen library.zip zu platzieren.

    unpacked $ zip -r9 ../library.zip *

    Ausgabe

    adding: piglatin/ (stored 0%) adding: piglatin/__init__.py (deflated 56%) adding: piglatin/__pycache__/ (stored 0%) adding: piglatin/__pycache__/__init__.cpython-39.pyc (deflated 31%) adding: piglatin-1.0.6.dist-info/ (stored 0%) adding: piglatin-1.0.6.dist-info/RECORD (deflated 39%) adding: piglatin-1.0.6.dist-info/LICENSE (deflated 41%) adding: piglatin-1.0.6.dist-info/WHEEL (deflated 15%) adding: piglatin-1.0.6.dist-info/REQUESTED (stored 0%) adding: piglatin-1.0.6.dist-info/INSTALLER (stored 0%) adding: piglatin-1.0.6.dist-info/METADATA (deflated 48%)
  7. (Optional) Verwenden Sie die folgenden Befehle, um den Import lokal zu testen.

    1. Legen Sie den Python-Pfad auf den Speicherort der library.zip-Datei fest und starten Sie Python.

      /home $ PYTHONPATH=/tmp/testpiglatin/library.zip /home $ python3

      Ausgabe

      Python 3.9.6 (default, Jun 29 2021, 06:20:32) [Clang 12.0.0 (clang-1200.0.32.29)] on darwin Type "help", "copyright", "credits" or "license" for more information.
    2. Importieren Sie die Bibliothek und führen Sie einen Testbefehl aus.

      >>> import piglatin >>> piglatin.translate('hello')

      Ausgabe

      'ello-hay'
  8. Verwenden Sie Befehle wie die folgenden, um die .zip-Datei aus Amazon S3 hinzuzufügen, sie in Ihr Notebook in Athena zu importieren und zu testen.

    sc.addPyFile('s3://amzn-s3-demo-bucket/library.zip') import piglatin piglatin.translate('hello') from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(piglatin.translate) df = spark.createDataFrame([(1, "hello"), (2, "world")]) df.withColumn("col", hi_udf(col('_2'))).show()

    Ausgabe

    Calculation started (calculation_id=e2c0a06e-f45d-d96d-9b8c-ff6a58b2a525) in (session=82c0a06d-d60e-8c66-5d12-23bcd55a6457). Checking calculation status... Calculation completed. +---+-----+--------+ | _1| _2| col| +---+-----+--------+ | 1|hello|ello-hay| | 2|world|orld-way| +---+-----+--------+

Importieren einer Python-ZIP-Datei aus PyPI mit Abhängigkeiten

In diesem Beispiel wird das md2gemini-Paket, das Text im Markdown in das Gemini-Textformat konvertiert, aus PyPI importiert. Das Paket weist folgende Abhängigkeiten auf:

cjkwrap mistune wcwidth
So importieren Sie eine Python-ZIP-Datei mit Abhängigkeiten
  1. Verwenden Sie auf Ihrem lokalen Computer die folgenden Befehle, um ein Verzeichnis mit dem Namen testmd2gemini zu erstellen und eine virtuelle Umgebung zu erstellen.

    /tmp $ mkdir testmd2gemini /tmp $ cd testmd2gemini testmd2gemini$ virtualenv .
  2. Erstellen Sie ein Unterverzeichnis mit dem Namen unpacked, um das Projekt zu speichern.

    testmd2gemini $ mkdir unpacked
  3. Verwenden Sie den pip-Befehl, um das Projekt in das unpacked-Verzeichnis zu installieren.

    /testmd2gemini $ bin/pip install -t $PWD/unpacked md2gemini

    Ausgabe

    Collecting md2gemini Downloading md2gemini-1.9.0-py3-none-any.whl (31 kB) Collecting wcwidth Downloading wcwidth-0.2.5-py2.py3-none-any.whl (30 kB) Collecting mistune<3,>=2.0.0 Downloading mistune-2.0.2-py2.py3-none-any.whl (24 kB) Collecting cjkwrap Downloading CJKwrap-2.2-py2.py3-none-any.whl (4.3 kB) Installing collected packages: wcwidth, mistune, cjkwrap, md2gemini Successfully installed cjkwrap-2.2 md2gemini-1.9.0 mistune-2.0.2 wcwidth-0.2.5 ...
  4. Wechseln Sie in das unpacked-Verzeichnis und überprüfen Sie den Inhalt.

    testmd2gemini $ cd unpacked unpacked $ ls -lah

    Ausgabe

    total 16 drwxr-xr-x 13 user1 wheel 416B Jun 7 18:43 . drwxr-xr-x 8 user1 wheel 256B Jun 7 18:44 .. drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 CJKwrap-2.2.dist-info drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 __pycache__ drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 bin -rw-r--r-- 1 user1 staff 5.0K Jun 7 18:43 cjkwrap.py drwxr-xr-x 7 user1 staff 224B Jun 7 18:43 md2gemini drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 md2gemini-1.9.0.dist-info drwxr-xr-x 12 user1 staff 384B Jun 7 18:43 mistune drwxr-xr-x 8 user1 staff 256B Jun 7 18:43 mistune-2.0.2.dist-info drwxr-xr-x 16 user1 staff 512B Jun 7 18:43 tests drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 wcwidth drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 wcwidth-0.2.5.dist-info
  5. Verwenden Sie den zip-Befehl, um den Inhalt des md2gemini-Projekts in einer Datei mit dem Namen md2gemini.zip zu platzieren.

    unpacked $ zip -r9 ../md2gemini *

    Ausgabe

    adding: CJKwrap-2.2.dist-info/ (stored 0%) adding: CJKwrap-2.2.dist-info/RECORD (deflated 37%) .... adding: wcwidth-0.2.5.dist-info/INSTALLER (stored 0%) adding: wcwidth-0.2.5.dist-info/METADATA (deflated 62%)
  6. (Optional) Verwenden Sie die folgenden Befehle, um zu testen, ob die Bibliothek auf Ihrem lokalen Computer funktioniert.

    1. Legen Sie den Python-Pfad auf den Speicherort der md2gemini.zip-Datei fest und starten Sie Python.

      /home $ PYTHONPATH=/tmp/testmd2gemini/md2gemini.zip /home python3
    2. Importieren Sie die Bibliothek und führen Sie einen Test durch.

      >>> from md2gemini import md2gemini >>> print(md2gemini('[abc](https://abc.def)'))

      Ausgabe

      https://abc.def abc
  7. Verwenden Sie die folgenden Befehle, um die .zip Datei aus Amazon S3 hinzuzufügen, sie in Ihr Notizbuch in Athena zu importieren und einen UDF Nicht-Test durchzuführen.

    # (non udf test) sc.addPyFile('s3://amzn-s3-demo-bucket/md2gemini.zip') from md2gemini import md2gemini print(md2gemini('[abc](https://abc.def)'))

    Ausgabe

    Calculation started (calculation_id=0ac0a082-6c3f-5a8f-eb6e-f8e9a5f9bc44) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. => https://abc.def (https://abc.def/) abc
  8. Verwenden Sie die folgenden Befehle, um einen UDF Test durchzuführen.

    # (udf test) from pyspark.sql.functions import udf from pyspark.sql.functions import col from md2gemini import md2gemini hi_udf = udf(md2gemini) df = spark.createDataFrame([(1, "[first website](https://abc.def)"), (2, "[second website](https://aws.com)")]) df.withColumn("col", hi_udf(col('_2'))).show()

    Ausgabe

    Calculation started (calculation_id=60c0a082-f04d-41c1-a10d-d5d365ef5157) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. +---+--------------------+--------------------+ | _1| _2| col| +---+--------------------+--------------------+ | 1|[first website](h...|=> https://abc.de...| | 2|[second website](...|=> https://aws.co...| +---+--------------------+--------------------+