Exemple de code : préparation des données à l'aide ResolveChoice de Lambda et ApplyMapping - AWS Glue

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Exemple de code : préparation des données à l'aide ResolveChoice de Lambda et ApplyMapping

L'ensemble de données utilisé dans cet exemple se compose des données de paiement du fournisseur d'assurance-maladie qui ont été téléchargées à partir de deux jeux de données Data.CMS.gov : « Inpatient Prospective Payment System Provider Summary for the Top 100 Diagnosis-Related Groups - FY2011 » et « Inpatient Charge Data FY 2011 ». Après le téléchargement des données, nous avons modifié le jeu de données de manière à introduire quelques enregistrements erronés à la fin du fichier. Ce fichier modifié est situé dans un compartiment Amazon S3 public à l'adresse s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv.

Le code source de cet exemple se trouve dans le data_cleaning_and_lambda.py fichier du GitHub référentiel AWS Glued'exemples.

La méthode préférée pour déboguer Python ou PySpark des scripts pendant l'exécution AWS consiste à utiliser Notebooks on AWS Glue Studio.

Étape 1 : analyser les données dans le compartiment Amazon S3

  1. Connectez-vous à la AWS Glue console AWS Management Console et ouvrez-la à l'adresse https://console.aws.amazon.com/glue/.

  2. En suivant le processus décrit dansConfiguration d'un crawler, créez un nouveau robot d'exploration capable d'explorer le s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv fichier et de placer les métadonnées obtenues dans une base de données nommée payments dans le catalogue de données AWS Glue.

  3. Exécutez le nouvel crawler, puis activez la base de données payments. Vous devriez constater que l'crawler a créé un tableau de métadonnées nommé medicare dans la base de données après avoir lu le début du fichier pour déterminer son format et son délimiteur.

    Le schéma de la nouvelle table medicare se présente comme suit :

    Column name Data type ================================================== drg definition string provider id bigint provider name string provider street address string provider city string provider state string provider zip code bigint hospital referral region description string total discharges bigint average covered charges string average total payments string average medicare payments string

Étape 2 : Ajouter le script Boilerplate au bloc-notes de point de terminaison de développement

Collez le script Boilerplate suivant dans le bloc-notes du point de terminaison de développement pour importer les bibliothèques AWS Glue dont vous avez besoin, et configurez un GlueContext :

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())

Étape 3 : Comparer les analyses des différents schémas

Ensuite, vous pouvez voir si le schéma qui a été reconnu par un DataFrame Apache Spark est le même que celui que votre crawler AWS Glue a enregistré. Exécutez ce code :

medicare = spark.read.format( "com.databricks.spark.csv").option( "header", "true").option( "inferSchema", "true").load( 's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv') medicare.printSchema()

Voici la sortie de l'appel printSchema :

root |-- DRG Definition: string (nullable = true) |-- Provider Id: string (nullable = true) |-- Provider Name: string (nullable = true) |-- Provider Street Address: string (nullable = true) |-- Provider City: string (nullable = true) |-- Provider State: string (nullable = true) |-- Provider Zip Code: integer (nullable = true) |-- Hospital Referral Region Description: string (nullable = true) |-- Total Discharges : integer (nullable = true) |-- Average Covered Charges : string (nullable = true) |-- Average Total Payments : string (nullable = true) |-- Average Medicare Payments: string (nullable = true)

Ensuite, examinez le schéma généré par un DynamicFrame AWS Glue :

medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog( database = "payments", table_name = "medicare") medicare_dynamicframe.printSchema()

La sortie de printSchema est la suivante :

root |-- drg definition: string |-- provider id: choice | |-- long | |-- string |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

Le DynamicFrame génère un schéma dans lequel provider id peut être un type long ou string. Le schéma DataFrame répertorie Provider Id comme étant un type string et le provider id répertorie Data Catalog comme étant un type bigint.

Lequel est correct ? Il existe deux enregistrements à la fin du fichier (sur un total de 160 000 enregistrements) avec des valeurs string dans cette colonne. Il s'agit des enregistrements erronés qui ont été introduits pour illustrer un problème.

Pour résoudre ce type de problème, DynamicFrame AWS Glue présente le concept de type choice. Dans ce cas, DynamicFrame montre que les valeurs long et string peuvent apparaître dans cette colonne. L'crawler AWS Glue a manqué les valeurs string car il seulement a pris en compte un préfixe de 2 Mo de données. Le DataFrame Apache Spark a pris en compte la totalité de l'ensemble de données, mais il a été contraint d'affecter le type le plus général à la colonne, à savoir string. En fait, Spark a souvent recours au cas le plus général pour des types ou des variations complexes avec lesquels il n'est pas familier.

Pour interroger la colonne provider id, commencez par résoudre le type de choix. Vous pouvez utiliser la méthode de transformation resolveChoice dans votre DynamicFrame pour convertir ces valeurs string en valeurs long avec une option cast:long :

medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')]) medicare_res.printSchema()

La sortie printSchema est désormais :

root |-- drg definition: string |-- provider id: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

Lorsque la valeur string ne peut pas être convertie, AWS Glue insère une valeur null.

Une autre option consiste à convertir le type de choix en struct, qui conserve les valeurs des deux types.

Ensuite, regardez les lignes qui étaient anormales :

medicare_res.toDF().where("'provider id' is NULL").show()

Les informations suivantes s'affichent :

+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+ | drg definition|provider id| provider name|provider street address|provider city|provider state|provider zip code|hospital referral region description|total discharges|average covered charges|average total payments|average medicare payments| +--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+ |948 - SIGNS & SYM...| null| INC| 1050 DIVISION ST| MAUSTON| WI| 53948| WI - Madison| 12| $11961.41| $4619.00| $3775.33| |948 - SIGNS & SYM...| null| INC- ST JOSEPH| 5000 W CHAMBERS ST| MILWAUKEE| WI| 53210| WI - Milwaukee| 14| $10514.28| $5562.50| $4522.78| +--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+

Maintenant, supprimez les deux enregistrements incorrects, comme suit :

medicare_dataframe = medicare_res.toDF() medicare_dataframe = medicare_dataframe.where("'provider id' is NOT NULL")

Étape 4 : Mapper les données et utiliser les fonctions Lambda Apache Spark

AWS Glue ne prend pas encore directement en charge les fonctions Lambda, également connues sous le nom de fonctions définies par l'utilisateur. Mais vous pouvez toujours convertir un DynamicFrame vers et depuis un DataFrame Apache Spark pour tirer avantage de la fonction Spark en plus des fonctions spécifiques de DynamicFrames.

Ensuite, convertissez les informations de paiement en chiffres, de sorte que les moteurs analytiques tels qu'Amazon Redshift ou Amazon Athena puissent analyser les chiffres plus rapidement :

from pyspark.sql.functions import udf from pyspark.sql.types import StringType chop_f = udf(lambda x: x[1:], StringType()) medicare_dataframe = medicare_dataframe.withColumn( "ACC", chop_f( medicare_dataframe["average covered charges"])).withColumn( "ATP", chop_f( medicare_dataframe["average total payments"])).withColumn( "AMP", chop_f( medicare_dataframe["average medicare payments"])) medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()

La sortie de l'appel show est la suivante :

+--------+-------+-------+ | ACC| ATP| AMP| +--------+-------+-------+ |32963.07|5777.24|4763.73| |15131.85|5787.57|4976.71| |37560.37|5434.95|4453.79| |13998.28|5417.56|4129.16| |31633.27|5658.33|4851.44| |16920.79|6653.80|5374.14| |11977.13|5834.74|4761.41| |35841.09|8031.12|5858.50| |28523.39|6113.38|5228.40| |75233.38|5541.05|4386.94| |67327.92|5461.57|4493.57| |39607.28|5356.28|4408.20| |22862.23|5374.65|4186.02| |31110.85|5366.23|4376.23| |25411.33|5282.93|4383.73| | 9234.51|5676.55|4509.11| |15895.85|5930.11|3972.85| |19721.16|6192.54|5179.38| |10710.88|4968.00|3898.88| |51343.75|5996.00|4962.45| +--------+-------+-------+ only showing top 20 rows

Cela reste toujours des chaînes dans les données. Nous pouvons utiliser la puissante méthode de transformation apply_mapping pour supprimer, renommer, analyser et imbriquer les données afin que d'autres systèmes et langages de programmation puissent facilement y accéder :

from awsglue.dynamicframe import DynamicFrame medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested") medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'), ('provider id', 'long', 'provider.id', 'long'), ('provider name', 'string', 'provider.name', 'string'), ('provider city', 'string', 'provider.city', 'string'), ('provider state', 'string', 'provider.state', 'string'), ('provider zip code', 'long', 'provider.zip', 'long'), ('hospital referral region description', 'string','rr', 'string'), ('ACC', 'string', 'charges.covered', 'double'), ('ATP', 'string', 'charges.total_pay', 'double'), ('AMP', 'string', 'charges.medicare_pay', 'double')]) medicare_nest_dyf.printSchema()

La sortie printSchema est la suivante :

root |-- drg: string |-- provider: struct | |-- id: long | |-- name: string | |-- city: string | |-- state: string | |-- zip: long |-- rr: string |-- charges: struct | |-- covered: double | |-- total_pay: double | |-- medicare_pay: double

Vous pouvez reconvertir les données en DataFrame Spark, comme suit :

medicare_nest_dyf.toDF().show()

La sortie est la suivante :

+--------------------+--------------------+---------------+--------------------+ | drg| provider| rr| charges| +--------------------+--------------------+---------------+--------------------+ |039 - EXTRACRANIA...|[10001,SOUTHEAST ...| AL - Dothan|[32963.07,5777.24...| |039 - EXTRACRANIA...|[10005,MARSHALL M...|AL - Birmingham|[15131.85,5787.57...| |039 - EXTRACRANIA...|[10006,ELIZA COFF...|AL - Birmingham|[37560.37,5434.95...| |039 - EXTRACRANIA...|[10011,ST VINCENT...|AL - Birmingham|[13998.28,5417.56...| |039 - EXTRACRANIA...|[10016,SHELBY BAP...|AL - Birmingham|[31633.27,5658.33...| |039 - EXTRACRANIA...|[10023,BAPTIST ME...|AL - Montgomery|[16920.79,6653.8,...| |039 - EXTRACRANIA...|[10029,EAST ALABA...|AL - Birmingham|[11977.13,5834.74...| |039 - EXTRACRANIA...|[10033,UNIVERSITY...|AL - Birmingham|[35841.09,8031.12...| |039 - EXTRACRANIA...|[10039,HUNTSVILLE...|AL - Huntsville|[28523.39,6113.38...| |039 - EXTRACRANIA...|[10040,GADSDEN RE...|AL - Birmingham|[75233.38,5541.05...| |039 - EXTRACRANIA...|[10046,RIVERVIEW ...|AL - Birmingham|[67327.92,5461.57...| |039 - EXTRACRANIA...|[10055,FLOWERS HO...| AL - Dothan|[39607.28,5356.28...| |039 - EXTRACRANIA...|[10056,ST VINCENT...|AL - Birmingham|[22862.23,5374.65...| |039 - EXTRACRANIA...|[10078,NORTHEAST ...|AL - Birmingham|[31110.85,5366.23...| |039 - EXTRACRANIA...|[10083,SOUTH BALD...| AL - Mobile|[25411.33,5282.93...| |039 - EXTRACRANIA...|[10085,DECATUR GE...|AL - Huntsville|[9234.51,5676.55,...| |039 - EXTRACRANIA...|[10090,PROVIDENCE...| AL - Mobile|[15895.85,5930.11...| |039 - EXTRACRANIA...|[10092,D C H REGI...|AL - Tuscaloosa|[19721.16,6192.54...| |039 - EXTRACRANIA...|[10100,THOMAS HOS...| AL - Mobile|[10710.88,4968.0,...| |039 - EXTRACRANIA...|[10103,BAPTIST ME...|AL - Birmingham|[51343.75,5996.0,...| +--------------------+--------------------+---------------+--------------------+ only showing top 20 rows

Étape 5 : Écrire les données dans Apache Parquet

AWS Glue permet d'écrire facilement des données dans un format tel que Apache Parquet, que les bases de données relationnelles peuvent utiliser de manière efficace :

glueContext.write_dynamic_frame.from_options( frame = medicare_nest_dyf, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"}, format = "parquet")