Exemplo de código: preparo de dados usando ResolveChoice, Lambda e ApplyMapping
O conjunto de dados que é usado neste exemplo consiste nos dados de pagamento de um provedor da Medicare obtidos por download em dois conjuntos de dados do Data.CMS.govs3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv
.
Você pode encontrar o código-fonte para este exemplo no arquivo data_cleaning_and_lambda.py
nos exemplos do AWS Glue
Etapa 1: crawling de dados no bucket do Amazon S3
Faça login no AWS Management Console e abra o console do AWS Glue em https://console.aws.amazon.com/glue/
. -
Seguindo o processo descrito em Configurar um crawler, crie um novo crawler que possa fazer crawling no arquivo
s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv
e colocar os metadados resultantes em um banco de dados chamadopayments
no AWS Glue Data Catalog. -
Execute o novo crawler e, em seguida, verifique o banco de dados
payments
. Você deve descobrir que o crawler criou uma tabela de metadados nomeadamedicare
no banco de dados depois de ler o início do arquivo para determinar seu formato e delimitador.O esquema da nova tabela
medicare
será o seguinte:Column name Data type ================================================== drg definition string provider id bigint provider name string provider street address string provider city string provider state string provider zip code bigint hospital referral region description string total discharges bigint average covered charges string average total payments string average medicare payments string
Etapa 2: adicionar o script de boilerplate ao caderno do endpoint de desenvolvimento
Cole o seguinte script de boilerplate no caderno do endpoint de desenvolvimento para importar as bibliotecas do AWS Glue necessárias e configure um único GlueContext
:
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())
Etapa 3: comparar diferentes análises de esquema
Em seguida, você poderá ver se o esquema que foi reconhecido por um DataFrame
do Apache Spark é igual àquele que seu crawler do AWS Glue gravou. Execute este código:
medicare = spark.read.format( "com.databricks.spark.csv").option( "header", "true").option( "inferSchema", "true").load( 's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv') medicare.printSchema()
Veja a seguir a saída da chamada printSchema
:
root
|-- DRG Definition: string (nullable = true)
|-- Provider Id: string (nullable = true)
|-- Provider Name: string (nullable = true)
|-- Provider Street Address: string (nullable = true)
|-- Provider City: string (nullable = true)
|-- Provider State: string (nullable = true)
|-- Provider Zip Code: integer (nullable = true)
|-- Hospital Referral Region Description: string (nullable = true)
|-- Total Discharges : integer (nullable = true)
|-- Average Covered Charges : string (nullable = true)
|-- Average Total Payments : string (nullable = true)
|-- Average Medicare Payments: string (nullable = true)
Em seguida, veja o esquema gerado por um AWS Glue DynamicFrame
:
medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog( database = "payments", table_name = "medicare") medicare_dynamicframe.printSchema()
A saída de printSchema
é a seguinte:
root
|-- drg definition: string
|-- provider id: choice
| |-- long
| |-- string
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string
O DynamicFrame
gera um esquema em que provider id
pode ser um tipo long
ou string
. O esquema DataFrame
lista Provider Id
como um tipo string
, e o Data Catalog lista provider id
como um tipo bigint
.
Qual está correto? Existem dois registros no final do arquivo (de 160 mil registros) com valores string
na coluna em questão. Estes são os registros errados que foram apresentados para ilustrar um problema.
Para resolver esse tipo de problema, o AWS Glue DynamicFrame
apresenta o conceito de um tipo de escolha. Neste caso, o DynamicFrame
mostra que os valores long
e string
podem ser exibidos na coluna. O crawler do AWS Glue não obteve os valores string
porque considerou apenas um prefixo de 2 MB dos dados. O DataFrame
do Apache Spark considerou todo o conjunto de dados, mas foi forçado a atribuir o tipo mais geral à coluna, ou seja, string
. Na verdade, o Spark recorre frequentemente ao caso mais geral quando há tipos complexos ou variações com os quais ele não está familiarizado.
Para consultar a coluna provider id
, resolva primeiro o tipo de escolha. Você pode usar o método de transformação resolveChoice
no seu DynamicFrame
para converter os valores string
para long
com uma opções cast:long
:
medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')]) medicare_res.printSchema()
A saída de printSchema
agora é:
root
|-- drg definition: string
|-- provider id: long
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string
Em que o valor era uma string
que não podia ser convertida, o AWS Glue inseriu null
.
Outra opção é converter o tipo de escolha para struct
, que mantém valores de ambos os tipos.
Em seguida, veja as linhas que eram anômalas:
medicare_res.toDF().where("'provider id' is NULL").show()
Você verá o seguinte:
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
| drg definition|provider id| provider name|provider street address|provider city|provider state|provider zip code|hospital referral region description|total discharges|average covered charges|average total payments|average medicare payments|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|948 - SIGNS & SYM...| null| INC| 1050 DIVISION ST| MAUSTON| WI| 53948| WI - Madison| 12| $11961.41| $4619.00| $3775.33|
|948 - SIGNS & SYM...| null| INC- ST JOSEPH| 5000 W CHAMBERS ST| MILWAUKEE| WI| 53210| WI - Milwaukee| 14| $10514.28| $5562.50| $4522.78|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
Agora, remova os dois registros malformados, da seguinte forma:
medicare_dataframe = medicare_res.toDF() medicare_dataframe = medicare_dataframe.where("'provider id' is NOT NULL")
Etapa 4: mapear os dados e usar as funções Lambda do Apache Spark
O AWS Glue ainda não oferece suporte diretamente às funções Lambda, também conhecidas como funções definidas pelo usuário. No entanto, você sempre pode converter um DynamicFrame
de e para um DataFrame
do Apache Spark para utilizar as funcionalidades do Spark e os recursos especiais de DynamicFrames
.
Em seguida, transforme as informações de pagamento em números para que os mecanismos de análise, como Amazon RedShift ou Amazon Athena, possam fazer seus respectivas análises mais rapidamente:
from pyspark.sql.functions import udf from pyspark.sql.types import StringType chop_f = udf(lambda x: x[1:], StringType()) medicare_dataframe = medicare_dataframe.withColumn( "ACC", chop_f( medicare_dataframe["average covered charges"])).withColumn( "ATP", chop_f( medicare_dataframe["average total payments"])).withColumn( "AMP", chop_f( medicare_dataframe["average medicare payments"])) medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()
A saída da chamada de show
é a seguinte:
+--------+-------+-------+
| ACC| ATP| AMP|
+--------+-------+-------+
|32963.07|5777.24|4763.73|
|15131.85|5787.57|4976.71|
|37560.37|5434.95|4453.79|
|13998.28|5417.56|4129.16|
|31633.27|5658.33|4851.44|
|16920.79|6653.80|5374.14|
|11977.13|5834.74|4761.41|
|35841.09|8031.12|5858.50|
|28523.39|6113.38|5228.40|
|75233.38|5541.05|4386.94|
|67327.92|5461.57|4493.57|
|39607.28|5356.28|4408.20|
|22862.23|5374.65|4186.02|
|31110.85|5366.23|4376.23|
|25411.33|5282.93|4383.73|
| 9234.51|5676.55|4509.11|
|15895.85|5930.11|3972.85|
|19721.16|6192.54|5179.38|
|10710.88|4968.00|3898.88|
|51343.75|5996.00|4962.45|
+--------+-------+-------+
only showing top 20 rows
Estas são todas as strings nos dados. Podemos usar o eficiente método de transformação apply_mapping
para soltar, renomear, converter e aninhar os dados de modo que outros idiomas e sistemas de programação de dados possam acessá-los facilmente:
from awsglue.dynamicframe import DynamicFrame medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested") medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'), ('provider id', 'long', 'provider.id', 'long'), ('provider name', 'string', 'provider.name', 'string'), ('provider city', 'string', 'provider.city', 'string'), ('provider state', 'string', 'provider.state', 'string'), ('provider zip code', 'long', 'provider.zip', 'long'), ('hospital referral region description', 'string','rr', 'string'), ('ACC', 'string', 'charges.covered', 'double'), ('ATP', 'string', 'charges.total_pay', 'double'), ('AMP', 'string', 'charges.medicare_pay', 'double')]) medicare_nest_dyf.printSchema()
A saída de printSchema
é a seguinte:
root
|-- drg: string
|-- provider: struct
| |-- id: long
| |-- name: string
| |-- city: string
| |-- state: string
| |-- zip: long
|-- rr: string
|-- charges: struct
| |-- covered: double
| |-- total_pay: double
| |-- medicare_pay: double
Direcionando os dados de volta para um DataFrame
do Spark, algo parecido com isto será exibido:
medicare_nest_dyf.toDF().show()
A saída é a seguinte:
+--------------------+--------------------+---------------+--------------------+
| drg| provider| rr| charges|
+--------------------+--------------------+---------------+--------------------+
|039 - EXTRACRANIA...|[10001,SOUTHEAST ...| AL - Dothan|[32963.07,5777.24...|
|039 - EXTRACRANIA...|[10005,MARSHALL M...|AL - Birmingham|[15131.85,5787.57...|
|039 - EXTRACRANIA...|[10006,ELIZA COFF...|AL - Birmingham|[37560.37,5434.95...|
|039 - EXTRACRANIA...|[10011,ST VINCENT...|AL - Birmingham|[13998.28,5417.56...|
|039 - EXTRACRANIA...|[10016,SHELBY BAP...|AL - Birmingham|[31633.27,5658.33...|
|039 - EXTRACRANIA...|[10023,BAPTIST ME...|AL - Montgomery|[16920.79,6653.8,...|
|039 - EXTRACRANIA...|[10029,EAST ALABA...|AL - Birmingham|[11977.13,5834.74...|
|039 - EXTRACRANIA...|[10033,UNIVERSITY...|AL - Birmingham|[35841.09,8031.12...|
|039 - EXTRACRANIA...|[10039,HUNTSVILLE...|AL - Huntsville|[28523.39,6113.38...|
|039 - EXTRACRANIA...|[10040,GADSDEN RE...|AL - Birmingham|[75233.38,5541.05...|
|039 - EXTRACRANIA...|[10046,RIVERVIEW ...|AL - Birmingham|[67327.92,5461.57...|
|039 - EXTRACRANIA...|[10055,FLOWERS HO...| AL - Dothan|[39607.28,5356.28...|
|039 - EXTRACRANIA...|[10056,ST VINCENT...|AL - Birmingham|[22862.23,5374.65...|
|039 - EXTRACRANIA...|[10078,NORTHEAST ...|AL - Birmingham|[31110.85,5366.23...|
|039 - EXTRACRANIA...|[10083,SOUTH BALD...| AL - Mobile|[25411.33,5282.93...|
|039 - EXTRACRANIA...|[10085,DECATUR GE...|AL - Huntsville|[9234.51,5676.55,...|
|039 - EXTRACRANIA...|[10090,PROVIDENCE...| AL - Mobile|[15895.85,5930.11...|
|039 - EXTRACRANIA...|[10092,D C H REGI...|AL - Tuscaloosa|[19721.16,6192.54...|
|039 - EXTRACRANIA...|[10100,THOMAS HOS...| AL - Mobile|[10710.88,4968.0,...|
|039 - EXTRACRANIA...|[10103,BAPTIST ME...|AL - Birmingham|[51343.75,5996.0,...|
+--------------------+--------------------+---------------+--------------------+
only showing top 20 rows
Etapa 5: gravar os dados no Apache Parquet
O AWS Glue facilita a escrita dos dados em um formato como o Apache Parquet, que os bancos de dados relacionais possam efetivamente consumir:
glueContext.write_dynamic_frame.from_options( frame = medicare_nest_dyf, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"}, format = "parquet")