Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Esempio di codice: preparazione dei dati utilizzando ResolveChoice, Lambda e ApplyMapping
Il set di dati utilizzati in questo esempio è costituito dai dati del pagamento del provider Medicare scaricati da due set di dati Data.CMS.govs3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv
.
Puoi trovare il codice sorgente di questo esempio nel data_cleaning_and_lambda.py
file dell' GitHub archivio degli AWS Glueesempi
Il modo preferito per eseguire il debug di Python PySpark o degli script durante l'esecuzione consiste nell'utilizzare AWS Notebooks su Glue Studio. AWS
Fase 1: esecuzione del crawling sui dati nel bucket Amazon S3
-
Seguendo il processo descritto inConfigurazione di un crawler, create un nuovo crawler in grado di eseguire la scansione del
s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv
file e di inserire i metadati risultanti in un database denominato nel AWS Glue Datapayments
Catalog. -
Esegui il nuovo crawler e controlla il database
payments
. Il crawler dovrebbe aver creato una tabella di metadati denominatamedicare
nel database dopo aver letto l'inizio del file per determinarne il formato e il delimitatore.Lo schema della nuova tabella
medicare
è il seguente:Column name Data type ================================================== drg definition string provider id bigint provider name string provider street address string provider city string provider state string provider zip code bigint hospital referral region description string total discharges bigint average covered charges string average total payments string average medicare payments string
Fase 2: aggiunta dello script boilerplate al notebook degli endpoint di sviluppo
Incolla lo script boilerplate seguente nel notebook degli endpoint di sviluppo per importare le librerie AWS Glue necessarie e configurare un singolo oggetto GlueContext
:
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())
Fase 3: confronta differenti analisi di schema
Puoi quindi verificare se lo schema riconosciuto da un oggetto DataFrame
Apache Spark è uguale a quello registrato dal crawler AWS Glue. Esegui questo codice:
medicare = spark.read.format( "com.databricks.spark.csv").option( "header", "true").option( "inferSchema", "true").load( 's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv') medicare.printSchema()
Ecco l'output dalla chiamata printSchema
:
root
|-- DRG Definition: string (nullable = true)
|-- Provider Id: string (nullable = true)
|-- Provider Name: string (nullable = true)
|-- Provider Street Address: string (nullable = true)
|-- Provider City: string (nullable = true)
|-- Provider State: string (nullable = true)
|-- Provider Zip Code: integer (nullable = true)
|-- Hospital Referral Region Description: string (nullable = true)
|-- Total Discharges : integer (nullable = true)
|-- Average Covered Charges : string (nullable = true)
|-- Average Total Payments : string (nullable = true)
|-- Average Medicare Payments: string (nullable = true)
Controlla quindi lo schema generato da un oggetto AWS Glue DynamicFrame
:
medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog( database = "payments", table_name = "medicare") medicare_dynamicframe.printSchema()
L'output printSchema
è il seguente:
root
|-- drg definition: string
|-- provider id: choice
| |-- long
| |-- string
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string
Il DynamicFrame
genera uno schema in cui provider id
potrebbe essere un tipo long
o string
. Lo schema DataFrame
elenca Provider Id
come tipo string
e il catalogo dati elenca provider id
come tipo bigint
.
Qual è corretto? Sono disponibili due record alla fine del file (su 160.000 record) con i valori string
nella colonna. Questi sono i record errati che sono stati introdotti per illustrare un problema.
Per risolvere questo problema, l'oggetto AWS Glue DynamicFrame
introduce il concetto di tipo choice. In questo caso, DynamicFrame
mostra che entrambi i valori long
e string
possono essere visualizzati nella colonna. Il crawler AWS Glue ha saltato i valori string
perché ha considerato solo un prefisso di 2 MB di dati. L'Apache Spark DataFrame
ha considerato l'intero set di dati, ma è stato costretto ad assegnare il tipo più generale alla colonna, ossia string
. Infatti, Spark spesso ricorre al caso più generale quando non ci sono tipi complessi o variazioni con cui non è familiare.
Per eseguire una query sulla colonna provider id
, risolvi prima il tipo di scelta. Puoi utilizzare il metodo di trasformazione resolveChoice
in DynamicFrame
per convertire quei valori string
in valori long
con un'opzione cast:long
:
medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')]) medicare_res.printSchema()
L'output printSchema
è ora:
root
|-- drg definition: string
|-- provider id: long
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string
Nel caso di un valore string
di cui non è stato possibile eseguire il cast, AWS Glue ha inserito null
.
Un'altra opzione consiste nel convertire il tipo di scelta in struct
, che mantiene i valori di entrambi i tipi.
Quindi, esaminare le righe anomale:
medicare_res.toDF().where("'provider id' is NULL").show()
Verrà visualizzato quanto segue:
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
| drg definition|provider id| provider name|provider street address|provider city|provider state|provider zip code|hospital referral region description|total discharges|average covered charges|average total payments|average medicare payments|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|948 - SIGNS & SYM...| null| INC| 1050 DIVISION ST| MAUSTON| WI| 53948| WI - Madison| 12| $11961.41| $4619.00| $3775.33|
|948 - SIGNS & SYM...| null| INC- ST JOSEPH| 5000 W CHAMBERS ST| MILWAUKEE| WI| 53210| WI - Milwaukee| 14| $10514.28| $5562.50| $4522.78|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
Ora rimuovi i due record difettosi, come segue:
medicare_dataframe = medicare_res.toDF() medicare_dataframe = medicare_dataframe.where("'provider id' is NOT NULL")
Fase 4: mappatura dei dati e utilizzo di funzioni Lambda Apache Spark
AWS Glue non supporta ancora direttamente le funzioni Lambda, note anche come funzioni definite dall'utente. Tuttavia, puoi sempre convertire un DynamicFrame
in e da un DataFrame
Apache Spark per trarre vantaggio dalle funzionalità Spark, oltre alle funzionalità speciali di DynamicFrames
.
Trasforma quindi i dati di pagamento in numeri, in modo che i motori di analisi come Amazon Redshift o Amazon Athena possano eseguire i calcoli più rapidamente:
from pyspark.sql.functions import udf from pyspark.sql.types import StringType chop_f = udf(lambda x: x[1:], StringType()) medicare_dataframe = medicare_dataframe.withColumn( "ACC", chop_f( medicare_dataframe["average covered charges"])).withColumn( "ATP", chop_f( medicare_dataframe["average total payments"])).withColumn( "AMP", chop_f( medicare_dataframe["average medicare payments"])) medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()
L'output dalla chiamata show
è:
+--------+-------+-------+
| ACC| ATP| AMP|
+--------+-------+-------+
|32963.07|5777.24|4763.73|
|15131.85|5787.57|4976.71|
|37560.37|5434.95|4453.79|
|13998.28|5417.56|4129.16|
|31633.27|5658.33|4851.44|
|16920.79|6653.80|5374.14|
|11977.13|5834.74|4761.41|
|35841.09|8031.12|5858.50|
|28523.39|6113.38|5228.40|
|75233.38|5541.05|4386.94|
|67327.92|5461.57|4493.57|
|39607.28|5356.28|4408.20|
|22862.23|5374.65|4186.02|
|31110.85|5366.23|4376.23|
|25411.33|5282.93|4383.73|
| 9234.51|5676.55|4509.11|
|15895.85|5930.11|3972.85|
|19721.16|6192.54|5179.38|
|10710.88|4968.00|3898.88|
|51343.75|5996.00|4962.45|
+--------+-------+-------+
only showing top 20 rows
Questi sono ancora tutte stringhe nei dati. Puoi utilizzare il potente metodo di trasformazione apply_mapping
per eliminare, rinominare, trasmettere e nidificare i dati in modo che i dati di altri linguaggi di programmazione e sistemi possano accedere facilmente:
from awsglue.dynamicframe import DynamicFrame medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested") medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'), ('provider id', 'long', 'provider.id', 'long'), ('provider name', 'string', 'provider.name', 'string'), ('provider city', 'string', 'provider.city', 'string'), ('provider state', 'string', 'provider.state', 'string'), ('provider zip code', 'long', 'provider.zip', 'long'), ('hospital referral region description', 'string','rr', 'string'), ('ACC', 'string', 'charges.covered', 'double'), ('ATP', 'string', 'charges.total_pay', 'double'), ('AMP', 'string', 'charges.medicare_pay', 'double')]) medicare_nest_dyf.printSchema()
L'output printSchema
è il seguente:
root
|-- drg: string
|-- provider: struct
| |-- id: long
| |-- name: string
| |-- city: string
| |-- state: string
| |-- zip: long
|-- rr: string
|-- charges: struct
| |-- covered: double
| |-- total_pay: double
| |-- medicare_pay: double
Trasformando i dati in unDataFrame
Spark, puoi visualizzare quello che appare ora:
medicare_nest_dyf.toDF().show()
L'output è il seguente:
+--------------------+--------------------+---------------+--------------------+
| drg| provider| rr| charges|
+--------------------+--------------------+---------------+--------------------+
|039 - EXTRACRANIA...|[10001,SOUTHEAST ...| AL - Dothan|[32963.07,5777.24...|
|039 - EXTRACRANIA...|[10005,MARSHALL M...|AL - Birmingham|[15131.85,5787.57...|
|039 - EXTRACRANIA...|[10006,ELIZA COFF...|AL - Birmingham|[37560.37,5434.95...|
|039 - EXTRACRANIA...|[10011,ST VINCENT...|AL - Birmingham|[13998.28,5417.56...|
|039 - EXTRACRANIA...|[10016,SHELBY BAP...|AL - Birmingham|[31633.27,5658.33...|
|039 - EXTRACRANIA...|[10023,BAPTIST ME...|AL - Montgomery|[16920.79,6653.8,...|
|039 - EXTRACRANIA...|[10029,EAST ALABA...|AL - Birmingham|[11977.13,5834.74...|
|039 - EXTRACRANIA...|[10033,UNIVERSITY...|AL - Birmingham|[35841.09,8031.12...|
|039 - EXTRACRANIA...|[10039,HUNTSVILLE...|AL - Huntsville|[28523.39,6113.38...|
|039 - EXTRACRANIA...|[10040,GADSDEN RE...|AL - Birmingham|[75233.38,5541.05...|
|039 - EXTRACRANIA...|[10046,RIVERVIEW ...|AL - Birmingham|[67327.92,5461.57...|
|039 - EXTRACRANIA...|[10055,FLOWERS HO...| AL - Dothan|[39607.28,5356.28...|
|039 - EXTRACRANIA...|[10056,ST VINCENT...|AL - Birmingham|[22862.23,5374.65...|
|039 - EXTRACRANIA...|[10078,NORTHEAST ...|AL - Birmingham|[31110.85,5366.23...|
|039 - EXTRACRANIA...|[10083,SOUTH BALD...| AL - Mobile|[25411.33,5282.93...|
|039 - EXTRACRANIA...|[10085,DECATUR GE...|AL - Huntsville|[9234.51,5676.55,...|
|039 - EXTRACRANIA...|[10090,PROVIDENCE...| AL - Mobile|[15895.85,5930.11...|
|039 - EXTRACRANIA...|[10092,D C H REGI...|AL - Tuscaloosa|[19721.16,6192.54...|
|039 - EXTRACRANIA...|[10100,THOMAS HOS...| AL - Mobile|[10710.88,4968.0,...|
|039 - EXTRACRANIA...|[10103,BAPTIST ME...|AL - Birmingham|[51343.75,5996.0,...|
+--------------------+--------------------+---------------+--------------------+
only showing top 20 rows
Fase 5: scrittura dei dati in Apache Parquet
AWS Glue semplifica la scrittura dei dati in un formato come Apache Parquet, che può essere usato in modo efficiente dai database relazionali:
glueContext.write_dynamic_frame.from_options( frame = medicare_nest_dyf, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"}, format = "parquet")