Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Codebeispiel: Datenvorbereitung mit ResolveChoice, Lambda und ApplyMapping
Der in diesem Beispiel verwendete Datensatz besteht aus Zahlungsdaten von Gesundheitsdienstleistern, die von zwei Data.CMS.govs3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv
gespeichert.
Den Quellcode für dieses Beispiel finden Sie in der data_cleaning_and_lambda.py
Datei im AWS Glue GitHub Beispiel-Repository
Die bevorzugte Methode zum Debuggen von Python oder PySpark Skripten während der Ausführung AWS ist die Verwendung von Notebooks auf AWS Glue Studio.
Schritt 1: Crawlen der Daten im Amazon S3 Bucket
Melden Sie sich bei der an AWS Management Console und öffnen Sie die AWS Glue Konsole unter https://console.aws.amazon.com/glue/
. -
Erstellen Sie nach dem unter beschriebenen Prozess einen neuen Crawler, der die
s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv
Datei crawlen und die resultierenden Metadaten in einer Datenbank platzieren kann, diepayments
im AWS Glue-Datenkatalog benannt ist. Konfiguration eines Crawlers -
Führen Sie den neuen Crawler aus und überprüfen Sie die
payments
-Datenbank. Sie sollten feststellen, dass der Crawler eine Metadatentabelle mit Namenmedicare
in der Datenbank angelegt hat, nachdem er den Anfang der Datei gelesen hat, um ihr Format und Trennzeichen zu bestimmen.Das Schema der neuen
medicare
-Tabelle sieht wie folgt aus:Column name Data type ================================================== drg definition string provider id bigint provider name string provider street address string provider city string provider state string provider zip code bigint hospital referral region description string total discharges bigint average covered charges string average total payments string average medicare payments string
Schritt 2: Hinzufügen des Boilerplate-Skripts zum Entwicklungsendpunkt-Notebook
Fügen Sie das folgende Boilerplate-Skript in das Entwicklungsendpunkt-Notebook ein, um die gewünschten AWS Glue-Bibliotheken zu importieren, und richten Sie einen einzelnen GlueContext
ein:
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())
Schritt 3: Vergleichen der verschiedene Schema Parsings
Als nächstes können Sie sehen, ob das Schema, das von einem Apache Spark DataFrame
erkannt wurde, mit dem Schema übereinstimmt, das Ihr AWS Glue-Crawler aufgezeichnet hat. Führen Sie diesen Code aus:
medicare = spark.read.format( "com.databricks.spark.csv").option( "header", "true").option( "inferSchema", "true").load( 's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv') medicare.printSchema()
Hier ist die Ausgabe des printSchema
-Aufrufs:
root
|-- DRG Definition: string (nullable = true)
|-- Provider Id: string (nullable = true)
|-- Provider Name: string (nullable = true)
|-- Provider Street Address: string (nullable = true)
|-- Provider City: string (nullable = true)
|-- Provider State: string (nullable = true)
|-- Provider Zip Code: integer (nullable = true)
|-- Hospital Referral Region Description: string (nullable = true)
|-- Total Discharges : integer (nullable = true)
|-- Average Covered Charges : string (nullable = true)
|-- Average Total Payments : string (nullable = true)
|-- Average Medicare Payments: string (nullable = true)
Als nächstes schauen Sie sich das Schema an, das ein AWS Glue-DynamicFrame
generiert:
medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog( database = "payments", table_name = "medicare") medicare_dynamicframe.printSchema()
Die Ausgabe von printSchema
sieht wie folgt aus:
root
|-- drg definition: string
|-- provider id: choice
| |-- long
| |-- string
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string
Das DynamicFrame
generiert ein Schema, in dem provider id
entweder ein long
oder ein string
Typ sein kann. Das DataFrame
-Schema listet Provider Id
als string
-Typ auf und der Data Catalog listet provider id
als bigint
-Typ auf.
Welches ist richtig? Es gibt zwei Datensätze am Ende der Datei (von 160.000 Datensätzen) mit string
-Werten in dieser Spalte. Dies sind die fehlerhaften Datensätze, die eingefügt wurden, um ein Problem zu veranschaulichen.
Um diese Art von Problem zu lösen, führt AWS Glue-DynamicFrame
das Konzept eines Auswahltyps ein. In diesem Fall zeigt das DynamicFrame
, dass sowohl long
als auch string
Werte in dieser Spalte erscheinen können. Der AWS Glue-Crawler verpasste die string
-Werte, da er nur die ersten 2 MB der Daten berücksichtigt. Der Apache Spark DataFrame
berücksichtigt den gesamten Datensatz. Er wurde jedoch gezwungen, der Spalte den allgemeinsten Typ zuzuweisen (string
). Tatsächlich greift Spark oft auf den allgemeinsten Fall zurück, wenn es komplexe Typen oder Variationen gibt, mit denen es nicht vertraut ist.
Um die provider id
-Spalte abzufragen, lösen Sie zuerst den Auswahltyp auf. Sie können die resolveChoice
-Transformationsmethode in Ihrem DynamicFrame
verwenden, um diese string
Werte in long
Werte mit einer cast:long
Option umzuwandeln:
medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')]) medicare_res.printSchema()
Die printSchema
-Ausgabe ist jetzt:
root
|-- drg definition: string
|-- provider id: long
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string
Wo der Wert ein string
war, das nicht gecastet werden konnte, fügt AWS Glue ein null
ein.
Eine weitere Möglichkeit besteht darin, den Auswahltyp in ein struct
umzuwandeln, das die Werte beider Typen beibehält.
Als nächstes schauen Sie sich die Zeilen an, die nicht normal waren:
medicare_res.toDF().where("'provider id' is NULL").show()
Sie sehen folgendes:
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
| drg definition|provider id| provider name|provider street address|provider city|provider state|provider zip code|hospital referral region description|total discharges|average covered charges|average total payments|average medicare payments|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|948 - SIGNS & SYM...| null| INC| 1050 DIVISION ST| MAUSTON| WI| 53948| WI - Madison| 12| $11961.41| $4619.00| $3775.33|
|948 - SIGNS & SYM...| null| INC- ST JOSEPH| 5000 W CHAMBERS ST| MILWAUKEE| WI| 53210| WI - Milwaukee| 14| $10514.28| $5562.50| $4522.78|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
Entfernen Sie nun die beiden fehlerhaften Datensätze wie folgt:
medicare_dataframe = medicare_res.toDF() medicare_dataframe = medicare_dataframe.where("'provider id' is NOT NULL")
Schritt 4: Zuordnen der Daten und Nutzung der Apache Spark Lambda Funktionen
AWS Glue unterstützt direkt Lambda-Funktionen (benutzerdefinierte Funktionen) noch nicht. Aber Sie können jederzeit ein DynamicFrame
zu und von Apache Spark DataFrame
konvertieren, um die Vorteile der Spark-Funktionalität zusätzlich zu den speziellen Features von DynamicFrames
zu nutzen.
Als nächstes wandeln Sie die Zahlungsinformationen in Zahlen um, sodass Analytik-Engines wie Amazon Redshift oder Amazon Athena ihre Zahlen schneller verarbeiten können:
from pyspark.sql.functions import udf from pyspark.sql.types import StringType chop_f = udf(lambda x: x[1:], StringType()) medicare_dataframe = medicare_dataframe.withColumn( "ACC", chop_f( medicare_dataframe["average covered charges"])).withColumn( "ATP", chop_f( medicare_dataframe["average total payments"])).withColumn( "AMP", chop_f( medicare_dataframe["average medicare payments"])) medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()
Die Ausgabe des show
-Aufrufs sieht wie folgt aus:
+--------+-------+-------+
| ACC| ATP| AMP|
+--------+-------+-------+
|32963.07|5777.24|4763.73|
|15131.85|5787.57|4976.71|
|37560.37|5434.95|4453.79|
|13998.28|5417.56|4129.16|
|31633.27|5658.33|4851.44|
|16920.79|6653.80|5374.14|
|11977.13|5834.74|4761.41|
|35841.09|8031.12|5858.50|
|28523.39|6113.38|5228.40|
|75233.38|5541.05|4386.94|
|67327.92|5461.57|4493.57|
|39607.28|5356.28|4408.20|
|22862.23|5374.65|4186.02|
|31110.85|5366.23|4376.23|
|25411.33|5282.93|4383.73|
| 9234.51|5676.55|4509.11|
|15895.85|5930.11|3972.85|
|19721.16|6192.54|5179.38|
|10710.88|4968.00|3898.88|
|51343.75|5996.00|4962.45|
+--------+-------+-------+
only showing top 20 rows
Das sind alles Zeichenfolgen in den Daten. Wir können die leistungsstarke apply_mapping
-Transformationsmethode verwenden, um die Daten zu löschen, umzubenennen, zu casten und zu verschachteln, sodass andere Datenprogrammiersprachen und -systeme leicht darauf zugreifen können:
from awsglue.dynamicframe import DynamicFrame medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested") medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'), ('provider id', 'long', 'provider.id', 'long'), ('provider name', 'string', 'provider.name', 'string'), ('provider city', 'string', 'provider.city', 'string'), ('provider state', 'string', 'provider.state', 'string'), ('provider zip code', 'long', 'provider.zip', 'long'), ('hospital referral region description', 'string','rr', 'string'), ('ACC', 'string', 'charges.covered', 'double'), ('ATP', 'string', 'charges.total_pay', 'double'), ('AMP', 'string', 'charges.medicare_pay', 'double')]) medicare_nest_dyf.printSchema()
Die printSchema
-Ausgabe sieht wie folgt aus:
root
|-- drg: string
|-- provider: struct
| |-- id: long
| |-- name: string
| |-- city: string
| |-- state: string
| |-- zip: long
|-- rr: string
|-- charges: struct
| |-- covered: double
| |-- total_pay: double
| |-- medicare_pay: double
Wenn Sie die Daten wieder in ein Spark DataFrame
verwandeln, können Sie sehen, wie sie jetzt aussehen:
medicare_nest_dyf.toDF().show()
Die Ausgabe sieht wie folgt aus:
+--------------------+--------------------+---------------+--------------------+
| drg| provider| rr| charges|
+--------------------+--------------------+---------------+--------------------+
|039 - EXTRACRANIA...|[10001,SOUTHEAST ...| AL - Dothan|[32963.07,5777.24...|
|039 - EXTRACRANIA...|[10005,MARSHALL M...|AL - Birmingham|[15131.85,5787.57...|
|039 - EXTRACRANIA...|[10006,ELIZA COFF...|AL - Birmingham|[37560.37,5434.95...|
|039 - EXTRACRANIA...|[10011,ST VINCENT...|AL - Birmingham|[13998.28,5417.56...|
|039 - EXTRACRANIA...|[10016,SHELBY BAP...|AL - Birmingham|[31633.27,5658.33...|
|039 - EXTRACRANIA...|[10023,BAPTIST ME...|AL - Montgomery|[16920.79,6653.8,...|
|039 - EXTRACRANIA...|[10029,EAST ALABA...|AL - Birmingham|[11977.13,5834.74...|
|039 - EXTRACRANIA...|[10033,UNIVERSITY...|AL - Birmingham|[35841.09,8031.12...|
|039 - EXTRACRANIA...|[10039,HUNTSVILLE...|AL - Huntsville|[28523.39,6113.38...|
|039 - EXTRACRANIA...|[10040,GADSDEN RE...|AL - Birmingham|[75233.38,5541.05...|
|039 - EXTRACRANIA...|[10046,RIVERVIEW ...|AL - Birmingham|[67327.92,5461.57...|
|039 - EXTRACRANIA...|[10055,FLOWERS HO...| AL - Dothan|[39607.28,5356.28...|
|039 - EXTRACRANIA...|[10056,ST VINCENT...|AL - Birmingham|[22862.23,5374.65...|
|039 - EXTRACRANIA...|[10078,NORTHEAST ...|AL - Birmingham|[31110.85,5366.23...|
|039 - EXTRACRANIA...|[10083,SOUTH BALD...| AL - Mobile|[25411.33,5282.93...|
|039 - EXTRACRANIA...|[10085,DECATUR GE...|AL - Huntsville|[9234.51,5676.55,...|
|039 - EXTRACRANIA...|[10090,PROVIDENCE...| AL - Mobile|[15895.85,5930.11...|
|039 - EXTRACRANIA...|[10092,D C H REGI...|AL - Tuscaloosa|[19721.16,6192.54...|
|039 - EXTRACRANIA...|[10100,THOMAS HOS...| AL - Mobile|[10710.88,4968.0,...|
|039 - EXTRACRANIA...|[10103,BAPTIST ME...|AL - Birmingham|[51343.75,5996.0,...|
+--------------------+--------------------+---------------+--------------------+
only showing top 20 rows
Schritt 5: Schreiben der Daten in Apache Parquet
AWS Glue erleichtert das Schreiben der Daten in einem Format wie Apache Parquet, das relationale Datenbanken effektiv nutzen können:
glueContext.write_dynamic_frame.from_options( frame = medicare_nest_dyf, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"}, format = "parquet")