Esempio di codice: preparazione dei dati utilizzando ResolveChoice, Lambda e ApplyMapping - AWS Glue

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Esempio di codice: preparazione dei dati utilizzando ResolveChoice, Lambda e ApplyMapping

Il set di dati utilizzati in questo esempio è costituito dai dati del pagamento del provider Medicare scaricati da due set di dati Data.CMS.gov: "Inpatient Prospective Payment System Provider Summary for the Top 100 Diagnosis-Related Groups - FY2011" (Riepilogo Provider sistema di pagamenti prospettici ospedalieri per i primi 100 gruppi correlati alla diagnosi - FY2011) e "Inpatient Charge Data FY 2011" (Dati di fatturazione ospedaliera FY 2011). Dopo aver scaricato i dati, abbiamo apportato delle modifiche al set di dati al fine di introdurre alcuni record errati nella parte finale del file. Questo file modificato si trova in un bucket pubblico Amazon S3 in s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv.

Puoi trovare il codice sorgente di questo esempio nel data_cleaning_and_lambda.py file dell' GitHub archivio degli AWS Glueesempi.

Il modo preferito per eseguire il debug di Python PySpark o degli script durante l'esecuzione consiste nell'utilizzare AWS Notebooks su Glue Studio. AWS

Fase 1: esecuzione del crawling sui dati nel bucket Amazon S3

  1. Accedi e apri la console all'indirizzo https://console.aws.amazon.com/glue/ AWS Management Console . AWS Glue

  2. Seguendo il processo descritto inConfigurazione di un crawler, create un nuovo crawler in grado di eseguire la scansione del s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv file e di inserire i metadati risultanti in un database denominato nel AWS Glue Data payments Catalog.

  3. Esegui il nuovo crawler e controlla il database payments. Il crawler dovrebbe aver creato una tabella di metadati denominata medicare nel database dopo aver letto l'inizio del file per determinarne il formato e il delimitatore.

    Lo schema della nuova tabella medicare è il seguente:

    Column name Data type ================================================== drg definition string provider id bigint provider name string provider street address string provider city string provider state string provider zip code bigint hospital referral region description string total discharges bigint average covered charges string average total payments string average medicare payments string

Fase 2: aggiunta dello script boilerplate al notebook degli endpoint di sviluppo

Incolla lo script boilerplate seguente nel notebook degli endpoint di sviluppo per importare le librerie AWS Glue necessarie e configurare un singolo oggetto GlueContext:

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())

Fase 3: confronta differenti analisi di schema

Puoi quindi verificare se lo schema riconosciuto da un oggetto DataFrame Apache Spark è uguale a quello registrato dal crawler AWS Glue. Esegui questo codice:

medicare = spark.read.format( "com.databricks.spark.csv").option( "header", "true").option( "inferSchema", "true").load( 's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv') medicare.printSchema()

Ecco l'output dalla chiamata printSchema:

root |-- DRG Definition: string (nullable = true) |-- Provider Id: string (nullable = true) |-- Provider Name: string (nullable = true) |-- Provider Street Address: string (nullable = true) |-- Provider City: string (nullable = true) |-- Provider State: string (nullable = true) |-- Provider Zip Code: integer (nullable = true) |-- Hospital Referral Region Description: string (nullable = true) |-- Total Discharges : integer (nullable = true) |-- Average Covered Charges : string (nullable = true) |-- Average Total Payments : string (nullable = true) |-- Average Medicare Payments: string (nullable = true)

Controlla quindi lo schema generato da un oggetto AWS Glue DynamicFrame:

medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog( database = "payments", table_name = "medicare") medicare_dynamicframe.printSchema()

L'output printSchema è il seguente:

root |-- drg definition: string |-- provider id: choice | |-- long | |-- string |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

Il DynamicFrame genera uno schema in cui provider id potrebbe essere un tipo long o string. Lo schema DataFrame elenca Provider Id come tipo string e il catalogo dati elenca provider id come tipo bigint.

Qual è corretto? Sono disponibili due record alla fine del file (su 160.000 record) con i valori string nella colonna. Questi sono i record errati che sono stati introdotti per illustrare un problema.

Per risolvere questo problema, l'oggetto AWS Glue DynamicFrame introduce il concetto di tipo choice. In questo caso, DynamicFrame mostra che entrambi i valori long e string possono essere visualizzati nella colonna. Il crawler AWS Glue ha saltato i valori string perché ha considerato solo un prefisso di 2 MB di dati. L'Apache Spark DataFrame ha considerato l'intero set di dati, ma è stato costretto ad assegnare il tipo più generale alla colonna, ossia string. Infatti, Spark spesso ricorre al caso più generale quando non ci sono tipi complessi o variazioni con cui non è familiare.

Per eseguire una query sulla colonna provider id, risolvi prima il tipo di scelta. Puoi utilizzare il metodo di trasformazione resolveChoice in DynamicFrame per convertire quei valori string in valori long con un'opzione cast:long:

medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')]) medicare_res.printSchema()

L'output printSchema è ora:

root |-- drg definition: string |-- provider id: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

Nel caso di un valore string di cui non è stato possibile eseguire il cast, AWS Glue ha inserito null.

Un'altra opzione consiste nel convertire il tipo di scelta in struct, che mantiene i valori di entrambi i tipi.

Quindi, esaminare le righe anomale:

medicare_res.toDF().where("'provider id' is NULL").show()

Verrà visualizzato quanto segue:

+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+ | drg definition|provider id| provider name|provider street address|provider city|provider state|provider zip code|hospital referral region description|total discharges|average covered charges|average total payments|average medicare payments| +--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+ |948 - SIGNS & SYM...| null| INC| 1050 DIVISION ST| MAUSTON| WI| 53948| WI - Madison| 12| $11961.41| $4619.00| $3775.33| |948 - SIGNS & SYM...| null| INC- ST JOSEPH| 5000 W CHAMBERS ST| MILWAUKEE| WI| 53210| WI - Milwaukee| 14| $10514.28| $5562.50| $4522.78| +--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+

Ora rimuovi i due record difettosi, come segue:

medicare_dataframe = medicare_res.toDF() medicare_dataframe = medicare_dataframe.where("'provider id' is NOT NULL")

Fase 4: mappatura dei dati e utilizzo di funzioni Lambda Apache Spark

AWS Glue non supporta ancora direttamente le funzioni Lambda, note anche come funzioni definite dall'utente. Tuttavia, puoi sempre convertire un DynamicFrame in e da un DataFrame Apache Spark per trarre vantaggio dalle funzionalità Spark, oltre alle funzionalità speciali di DynamicFrames.

Trasforma quindi i dati di pagamento in numeri, in modo che i motori di analisi come Amazon Redshift o Amazon Athena possano eseguire i calcoli più rapidamente:

from pyspark.sql.functions import udf from pyspark.sql.types import StringType chop_f = udf(lambda x: x[1:], StringType()) medicare_dataframe = medicare_dataframe.withColumn( "ACC", chop_f( medicare_dataframe["average covered charges"])).withColumn( "ATP", chop_f( medicare_dataframe["average total payments"])).withColumn( "AMP", chop_f( medicare_dataframe["average medicare payments"])) medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()

L'output dalla chiamata show è:

+--------+-------+-------+ | ACC| ATP| AMP| +--------+-------+-------+ |32963.07|5777.24|4763.73| |15131.85|5787.57|4976.71| |37560.37|5434.95|4453.79| |13998.28|5417.56|4129.16| |31633.27|5658.33|4851.44| |16920.79|6653.80|5374.14| |11977.13|5834.74|4761.41| |35841.09|8031.12|5858.50| |28523.39|6113.38|5228.40| |75233.38|5541.05|4386.94| |67327.92|5461.57|4493.57| |39607.28|5356.28|4408.20| |22862.23|5374.65|4186.02| |31110.85|5366.23|4376.23| |25411.33|5282.93|4383.73| | 9234.51|5676.55|4509.11| |15895.85|5930.11|3972.85| |19721.16|6192.54|5179.38| |10710.88|4968.00|3898.88| |51343.75|5996.00|4962.45| +--------+-------+-------+ only showing top 20 rows

Questi sono ancora tutte stringhe nei dati. Puoi utilizzare il potente metodo di trasformazione apply_mapping per eliminare, rinominare, trasmettere e nidificare i dati in modo che i dati di altri linguaggi di programmazione e sistemi possano accedere facilmente:

from awsglue.dynamicframe import DynamicFrame medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested") medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'), ('provider id', 'long', 'provider.id', 'long'), ('provider name', 'string', 'provider.name', 'string'), ('provider city', 'string', 'provider.city', 'string'), ('provider state', 'string', 'provider.state', 'string'), ('provider zip code', 'long', 'provider.zip', 'long'), ('hospital referral region description', 'string','rr', 'string'), ('ACC', 'string', 'charges.covered', 'double'), ('ATP', 'string', 'charges.total_pay', 'double'), ('AMP', 'string', 'charges.medicare_pay', 'double')]) medicare_nest_dyf.printSchema()

L'output printSchema è il seguente:

root |-- drg: string |-- provider: struct | |-- id: long | |-- name: string | |-- city: string | |-- state: string | |-- zip: long |-- rr: string |-- charges: struct | |-- covered: double | |-- total_pay: double | |-- medicare_pay: double

Trasformando i dati in unDataFrame Spark, puoi visualizzare quello che appare ora:

medicare_nest_dyf.toDF().show()

L'output è il seguente:

+--------------------+--------------------+---------------+--------------------+ | drg| provider| rr| charges| +--------------------+--------------------+---------------+--------------------+ |039 - EXTRACRANIA...|[10001,SOUTHEAST ...| AL - Dothan|[32963.07,5777.24...| |039 - EXTRACRANIA...|[10005,MARSHALL M...|AL - Birmingham|[15131.85,5787.57...| |039 - EXTRACRANIA...|[10006,ELIZA COFF...|AL - Birmingham|[37560.37,5434.95...| |039 - EXTRACRANIA...|[10011,ST VINCENT...|AL - Birmingham|[13998.28,5417.56...| |039 - EXTRACRANIA...|[10016,SHELBY BAP...|AL - Birmingham|[31633.27,5658.33...| |039 - EXTRACRANIA...|[10023,BAPTIST ME...|AL - Montgomery|[16920.79,6653.8,...| |039 - EXTRACRANIA...|[10029,EAST ALABA...|AL - Birmingham|[11977.13,5834.74...| |039 - EXTRACRANIA...|[10033,UNIVERSITY...|AL - Birmingham|[35841.09,8031.12...| |039 - EXTRACRANIA...|[10039,HUNTSVILLE...|AL - Huntsville|[28523.39,6113.38...| |039 - EXTRACRANIA...|[10040,GADSDEN RE...|AL - Birmingham|[75233.38,5541.05...| |039 - EXTRACRANIA...|[10046,RIVERVIEW ...|AL - Birmingham|[67327.92,5461.57...| |039 - EXTRACRANIA...|[10055,FLOWERS HO...| AL - Dothan|[39607.28,5356.28...| |039 - EXTRACRANIA...|[10056,ST VINCENT...|AL - Birmingham|[22862.23,5374.65...| |039 - EXTRACRANIA...|[10078,NORTHEAST ...|AL - Birmingham|[31110.85,5366.23...| |039 - EXTRACRANIA...|[10083,SOUTH BALD...| AL - Mobile|[25411.33,5282.93...| |039 - EXTRACRANIA...|[10085,DECATUR GE...|AL - Huntsville|[9234.51,5676.55,...| |039 - EXTRACRANIA...|[10090,PROVIDENCE...| AL - Mobile|[15895.85,5930.11...| |039 - EXTRACRANIA...|[10092,D C H REGI...|AL - Tuscaloosa|[19721.16,6192.54...| |039 - EXTRACRANIA...|[10100,THOMAS HOS...| AL - Mobile|[10710.88,4968.0,...| |039 - EXTRACRANIA...|[10103,BAPTIST ME...|AL - Birmingham|[51343.75,5996.0,...| +--------------------+--------------------+---------------+--------------------+ only showing top 20 rows

Fase 5: scrittura dei dati in Apache Parquet

AWS Glue semplifica la scrittura dei dati in un formato come Apache Parquet, che può essere usato in modo efficiente dai database relazionali:

glueContext.write_dynamic_frame.from_options( frame = medicare_nest_dyf, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"}, format = "parquet")