Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Exemple de code : préparation des données à l'aide ResolveChoice de Lambda et ApplyMapping
L'ensemble de données utilisé dans cet exemple se compose des données de paiement du fournisseur d'assurance-maladie qui ont été téléchargées à partir de deux jeux de données Data.CMS.govs3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv
.
Le code source de cet exemple se trouve dans le data_cleaning_and_lambda.py
fichier du GitHub référentiel AWS Glued'exemples
La méthode préférée pour déboguer Python ou PySpark des scripts pendant l'exécution AWS consiste à utiliser Notebooks on AWS Glue Studio.
Étape 1 : analyser les données dans le compartiment Amazon S3
Connectez-vous à la AWS Glue console AWS Management Console et ouvrez-la à l'adresse https://console.aws.amazon.com/glue/
. -
En suivant le processus décrit dansConfiguration d'un crawler, créez un nouveau robot d'exploration capable d'explorer le
s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv
fichier et de placer les métadonnées obtenues dans une base de données nomméepayments
dans le catalogue de données AWS Glue. -
Exécutez le nouvel crawler, puis activez la base de données
payments
. Vous devriez constater que l'crawler a créé un tableau de métadonnées nommémedicare
dans la base de données après avoir lu le début du fichier pour déterminer son format et son délimiteur.Le schéma de la nouvelle table
medicare
se présente comme suit :Column name Data type ================================================== drg definition string provider id bigint provider name string provider street address string provider city string provider state string provider zip code bigint hospital referral region description string total discharges bigint average covered charges string average total payments string average medicare payments string
Étape 2 : Ajouter le script Boilerplate au bloc-notes de point de terminaison de développement
Collez le script Boilerplate suivant dans le bloc-notes du point de terminaison de développement pour importer les bibliothèques AWS Glue dont vous avez besoin, et configurez un GlueContext
:
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())
Étape 3 : Comparer les analyses des différents schémas
Ensuite, vous pouvez voir si le schéma qui a été reconnu par un DataFrame
Apache Spark est le même que celui que votre crawler AWS Glue a enregistré. Exécutez ce code :
medicare = spark.read.format( "com.databricks.spark.csv").option( "header", "true").option( "inferSchema", "true").load( 's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv') medicare.printSchema()
Voici la sortie de l'appel printSchema
:
root
|-- DRG Definition: string (nullable = true)
|-- Provider Id: string (nullable = true)
|-- Provider Name: string (nullable = true)
|-- Provider Street Address: string (nullable = true)
|-- Provider City: string (nullable = true)
|-- Provider State: string (nullable = true)
|-- Provider Zip Code: integer (nullable = true)
|-- Hospital Referral Region Description: string (nullable = true)
|-- Total Discharges : integer (nullable = true)
|-- Average Covered Charges : string (nullable = true)
|-- Average Total Payments : string (nullable = true)
|-- Average Medicare Payments: string (nullable = true)
Ensuite, examinez le schéma généré par un DynamicFrame
AWS Glue :
medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog( database = "payments", table_name = "medicare") medicare_dynamicframe.printSchema()
La sortie de printSchema
est la suivante :
root
|-- drg definition: string
|-- provider id: choice
| |-- long
| |-- string
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string
Le DynamicFrame
génère un schéma dans lequel provider id
peut être un type long
ou string
. Le schéma DataFrame
répertorie Provider Id
comme étant un type string
et le provider id
répertorie Data Catalog comme étant un type bigint
.
Lequel est correct ? Il existe deux enregistrements à la fin du fichier (sur un total de 160 000 enregistrements) avec des valeurs string
dans cette colonne. Il s'agit des enregistrements erronés qui ont été introduits pour illustrer un problème.
Pour résoudre ce type de problème, DynamicFrame
AWS Glue présente le concept de type choice. Dans ce cas, DynamicFrame
montre que les valeurs long
et string
peuvent apparaître dans cette colonne. L'crawler AWS Glue a manqué les valeurs string
car il seulement a pris en compte un préfixe de 2 Mo de données. Le DataFrame
Apache Spark a pris en compte la totalité de l'ensemble de données, mais il a été contraint d'affecter le type le plus général à la colonne, à savoir string
. En fait, Spark a souvent recours au cas le plus général pour des types ou des variations complexes avec lesquels il n'est pas familier.
Pour interroger la colonne provider id
, commencez par résoudre le type de choix. Vous pouvez utiliser la méthode de transformation resolveChoice
dans votre DynamicFrame
pour convertir ces valeurs string
en valeurs long
avec une option cast:long
:
medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')]) medicare_res.printSchema()
La sortie printSchema
est désormais :
root
|-- drg definition: string
|-- provider id: long
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string
Lorsque la valeur string
ne peut pas être convertie, AWS Glue insère une valeur null
.
Une autre option consiste à convertir le type de choix en struct
, qui conserve les valeurs des deux types.
Ensuite, regardez les lignes qui étaient anormales :
medicare_res.toDF().where("'provider id' is NULL").show()
Les informations suivantes s'affichent :
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
| drg definition|provider id| provider name|provider street address|provider city|provider state|provider zip code|hospital referral region description|total discharges|average covered charges|average total payments|average medicare payments|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|948 - SIGNS & SYM...| null| INC| 1050 DIVISION ST| MAUSTON| WI| 53948| WI - Madison| 12| $11961.41| $4619.00| $3775.33|
|948 - SIGNS & SYM...| null| INC- ST JOSEPH| 5000 W CHAMBERS ST| MILWAUKEE| WI| 53210| WI - Milwaukee| 14| $10514.28| $5562.50| $4522.78|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
Maintenant, supprimez les deux enregistrements incorrects, comme suit :
medicare_dataframe = medicare_res.toDF() medicare_dataframe = medicare_dataframe.where("'provider id' is NOT NULL")
Étape 4 : Mapper les données et utiliser les fonctions Lambda Apache Spark
AWS Glue ne prend pas encore directement en charge les fonctions Lambda, également connues sous le nom de fonctions définies par l'utilisateur. Mais vous pouvez toujours convertir un DynamicFrame
vers et depuis un DataFrame
Apache Spark pour tirer avantage de la fonction Spark en plus des fonctions spécifiques de DynamicFrames
.
Ensuite, convertissez les informations de paiement en chiffres, de sorte que les moteurs analytiques tels qu'Amazon Redshift ou Amazon Athena puissent analyser les chiffres plus rapidement :
from pyspark.sql.functions import udf from pyspark.sql.types import StringType chop_f = udf(lambda x: x[1:], StringType()) medicare_dataframe = medicare_dataframe.withColumn( "ACC", chop_f( medicare_dataframe["average covered charges"])).withColumn( "ATP", chop_f( medicare_dataframe["average total payments"])).withColumn( "AMP", chop_f( medicare_dataframe["average medicare payments"])) medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()
La sortie de l'appel show
est la suivante :
+--------+-------+-------+
| ACC| ATP| AMP|
+--------+-------+-------+
|32963.07|5777.24|4763.73|
|15131.85|5787.57|4976.71|
|37560.37|5434.95|4453.79|
|13998.28|5417.56|4129.16|
|31633.27|5658.33|4851.44|
|16920.79|6653.80|5374.14|
|11977.13|5834.74|4761.41|
|35841.09|8031.12|5858.50|
|28523.39|6113.38|5228.40|
|75233.38|5541.05|4386.94|
|67327.92|5461.57|4493.57|
|39607.28|5356.28|4408.20|
|22862.23|5374.65|4186.02|
|31110.85|5366.23|4376.23|
|25411.33|5282.93|4383.73|
| 9234.51|5676.55|4509.11|
|15895.85|5930.11|3972.85|
|19721.16|6192.54|5179.38|
|10710.88|4968.00|3898.88|
|51343.75|5996.00|4962.45|
+--------+-------+-------+
only showing top 20 rows
Cela reste toujours des chaînes dans les données. Nous pouvons utiliser la puissante méthode de transformation apply_mapping
pour supprimer, renommer, analyser et imbriquer les données afin que d'autres systèmes et langages de programmation puissent facilement y accéder :
from awsglue.dynamicframe import DynamicFrame medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested") medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'), ('provider id', 'long', 'provider.id', 'long'), ('provider name', 'string', 'provider.name', 'string'), ('provider city', 'string', 'provider.city', 'string'), ('provider state', 'string', 'provider.state', 'string'), ('provider zip code', 'long', 'provider.zip', 'long'), ('hospital referral region description', 'string','rr', 'string'), ('ACC', 'string', 'charges.covered', 'double'), ('ATP', 'string', 'charges.total_pay', 'double'), ('AMP', 'string', 'charges.medicare_pay', 'double')]) medicare_nest_dyf.printSchema()
La sortie printSchema
est la suivante :
root
|-- drg: string
|-- provider: struct
| |-- id: long
| |-- name: string
| |-- city: string
| |-- state: string
| |-- zip: long
|-- rr: string
|-- charges: struct
| |-- covered: double
| |-- total_pay: double
| |-- medicare_pay: double
Vous pouvez reconvertir les données en DataFrame
Spark, comme suit :
medicare_nest_dyf.toDF().show()
La sortie est la suivante :
+--------------------+--------------------+---------------+--------------------+
| drg| provider| rr| charges|
+--------------------+--------------------+---------------+--------------------+
|039 - EXTRACRANIA...|[10001,SOUTHEAST ...| AL - Dothan|[32963.07,5777.24...|
|039 - EXTRACRANIA...|[10005,MARSHALL M...|AL - Birmingham|[15131.85,5787.57...|
|039 - EXTRACRANIA...|[10006,ELIZA COFF...|AL - Birmingham|[37560.37,5434.95...|
|039 - EXTRACRANIA...|[10011,ST VINCENT...|AL - Birmingham|[13998.28,5417.56...|
|039 - EXTRACRANIA...|[10016,SHELBY BAP...|AL - Birmingham|[31633.27,5658.33...|
|039 - EXTRACRANIA...|[10023,BAPTIST ME...|AL - Montgomery|[16920.79,6653.8,...|
|039 - EXTRACRANIA...|[10029,EAST ALABA...|AL - Birmingham|[11977.13,5834.74...|
|039 - EXTRACRANIA...|[10033,UNIVERSITY...|AL - Birmingham|[35841.09,8031.12...|
|039 - EXTRACRANIA...|[10039,HUNTSVILLE...|AL - Huntsville|[28523.39,6113.38...|
|039 - EXTRACRANIA...|[10040,GADSDEN RE...|AL - Birmingham|[75233.38,5541.05...|
|039 - EXTRACRANIA...|[10046,RIVERVIEW ...|AL - Birmingham|[67327.92,5461.57...|
|039 - EXTRACRANIA...|[10055,FLOWERS HO...| AL - Dothan|[39607.28,5356.28...|
|039 - EXTRACRANIA...|[10056,ST VINCENT...|AL - Birmingham|[22862.23,5374.65...|
|039 - EXTRACRANIA...|[10078,NORTHEAST ...|AL - Birmingham|[31110.85,5366.23...|
|039 - EXTRACRANIA...|[10083,SOUTH BALD...| AL - Mobile|[25411.33,5282.93...|
|039 - EXTRACRANIA...|[10085,DECATUR GE...|AL - Huntsville|[9234.51,5676.55,...|
|039 - EXTRACRANIA...|[10090,PROVIDENCE...| AL - Mobile|[15895.85,5930.11...|
|039 - EXTRACRANIA...|[10092,D C H REGI...|AL - Tuscaloosa|[19721.16,6192.54...|
|039 - EXTRACRANIA...|[10100,THOMAS HOS...| AL - Mobile|[10710.88,4968.0,...|
|039 - EXTRACRANIA...|[10103,BAPTIST ME...|AL - Birmingham|[51343.75,5996.0,...|
+--------------------+--------------------+---------------+--------------------+
only showing top 20 rows
Étape 5 : Écrire les données dans Apache Parquet
AWS Glue permet d'écrire facilement des données dans un format tel que Apache Parquet, que les bases de données relationnelles peuvent utiliser de manière efficace :
glueContext.write_dynamic_frame.from_options( frame = medicare_nest_dyf, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"}, format = "parquet")