Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Exemple de code : Données de jonction et de mise en relation
Cet exemple utilise un ensemble de données qui a été téléchargé sur http://everypolitician.org/sample-dataset
d'Amazon Simple Storage Service (Amazon S3) : s3://awsglue-datasets/examples/us-legislators/all
. Cet ensemble de données contient des données au format JSON concernant les législateurs américains et les fonctions qu'ils ont occupées à la Chambre des représentants et au Sénat. Il a été légèrement modifié et il a été mis à la disposition des utilisateurs dans un compartiment Amazon S3 public aux fins de ce didacticiel.
Le code source de cet exemple se trouve dans le join_and_relationalize.py
fichier du référentiel AWS Glue d'échantillons
Grâce à ces données, ce didacticiel vous montre comment effectuer les opérations suivantes :
Utilisez un AWS Glue robot d'exploration pour classer les objets stockés dans un compartiment Amazon S3 public et enregistrez leurs schémas dans le catalogue de données AWS Glue.
Examiner les métadonnées et les schémas de la table résultant de l'analyse ;
-
Écrire un script Extract-transform-load (ETL) en Python qui utilise les métadonnées dans Data Catalog pour effectuer les actions suivantes :
Joindre les données dans différents fichiers sources en une seule table de données (c'est-à-dire, dénormaliser les données).
Filtrer la table jointe en tables distinctes par type de législateur.
Écrire les données résultantes en vue de séparer les fichiers Apache Parquet à des fins d'analyse ultérieure.
La méthode préférée pour déboguer Python ou PySpark des scripts pendant l'exécution AWS consiste à utiliser Notebooks on AWS Glue Studio.
Étape 1 : analyser les données dans le compartiment Amazon S3
-
Connectez-vous à la AWS Management ConsoleAWS Glue console et ouvrez-la à l'adresse https://console.aws.amazon.com/glue/
. -
En suivant les étapes décritesConfiguration d'un crawler, créez un nouveau robot d'exploration capable d'explorer le
s3://awsglue-datasets/examples/us-legislators/all
jeu de données dans une base de données nomméelegislators
dans le catalogue de données AWS Glue. Les exemples de données sont déjà dans ce compartiment Amazon S3 public. -
Exécutez le nouvel crawler, puis activez la base de données
legislators
.L'crawler crée les tables de métadonnées suivantes :
-
persons_json
-
memberships_json
-
organizations_json
-
events_json
-
areas_json
-
countries_r_json
Il s'agit d'un ensemble de tables semi-normalisé contenant les législateurs et leurs carrières.
-
Étape 2 : Ajouter le script Boilerplate au bloc-notes de point de terminaison de développement
Collez le script Boilerplate suivant dans le bloc-notes du point de terminaison de développement pour importer les bibliothèques AWS Glue dont vous avez besoin, et configurez un GlueContext
:
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())
Étape 3 : examiner les schémas à partir des données de Data Catalog
Ensuite, vous pouvez facilement créer, examiner un DynamicFrame à partir du catalogue de données AWS Glue et examiner les schémas des données. Par exemple, pour afficher le schéma de la table persons_json
, ajoutez les éléments suivants à votre bloc-notes :
persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json") print "Count: ", persons.count() persons.printSchema()
Voici le résultat des appels d'impression :
Count: 1961
root
|-- family_name: string
|-- name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- note: string
| | |-- name: string
| | |-- lang: string
|-- sort_name: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- death_date: string
Chaque personne contenue dans la table est un membre de certains des organismes du Congrès des Etats-Unis.
Pour afficher le schéma de la table memberships_json
, tapez ce qui suit :
memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json") print "Count: ", memberships.count() memberships.printSchema()
La sortie est la suivante :
Count: 10439
root
|-- area_id: string
|-- on_behalf_of_id: string
|-- organization_id: string
|-- role: string
|-- person_id: string
|-- legislative_period_id: string
|-- start_date: string
|-- end_date: string
Les organizations
sont des partis et les deux chambres du Congrès, le Sénat et la Chambre des représentants. Pour afficher le schéma de la table organizations_json
, tapez ce qui suit :
orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json") print "Count: ", orgs.count() orgs.printSchema()
La sortie est la suivante :
Count: 13
root
|-- classification: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- image: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- lang: string
| | |-- note: string
| | |-- name: string
|-- id: string
|-- name: string
|-- seats: int
|-- type: string
Étape 4 : Filtrer les données
Ensuite, conservez uniquement les champs de votre choix, et renommez id
en org_id
. L'ensemble de données est suffisamment petit pour que vous puissiez l'afficher entièrement.
toDF()
convertit DynamicFrame
en DataFrame
Apache Spark, afin que vous puissiez appliquer les transformations qui existent déjà dans Apache Spark SQL :
orgs = orgs.drop_fields(['other_names', 'identifiers']).rename_field( 'id', 'org_id').rename_field( 'name', 'org_name') orgs.toDF().show()
Le résultat est présenté ci-dessous :
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|classification| org_id| org_name| links|seats| type| image|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
| party| party/al| AL| null| null| null| null|
| party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...|
| party|party/democrat-li...| Democrat-Liberal|[[website,http://...| null| null| null|
| legislature|d56acebe-8fdc-47b...|House of Represen...| null| 435|lower house| null|
| party| party/independent| Independent| null| null| null| null|
| party|party/new_progres...| New Progressive|[[website,http://...| null| null|https://upload.wi...|
| party|party/popular_dem...| Popular Democrat|[[website,http://...| null| null| null|
| party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...|
| party|party/republican-...|Republican-Conser...|[[website,http://...| null| null| null|
| party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...|
| party| party/independent| Independent| null| null| null| null|
| party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...|
| legislature|8fa6c3d2-71dc-478...| Senate| null| 100|upper house| null|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
Tapez la commande suivante pour afficher les organizations
qui apparaissent dans memberships
:
memberships.select_fields(['organization_id']).toDF().distinct().show()
Le résultat est présenté ci-dessous :
+--------------------+
| organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+
Étape 5 : Synthèse
À présent, utilisez AWS Glue pour joindre ces tables relationnelles et créer une table complète des carrières des memberships
des législateurs et de leurs organizations
correspondantes.
-
Commencez par joindre
persons
etmemberships
àid
etperson_id
. -
Ensuite, joignez le résultat avec
orgs
àorg_id
etorganization_id
. -
Ensuite, déplacez les champs redondants,
person_id
etorg_id
.
Vous pouvez effectuer toutes ces opérations en une seule ligne de code (étendu) :
l_history = Join.apply(orgs, Join.apply(persons, memberships, 'id', 'person_id'), 'org_id', 'organization_id').drop_fields(['person_id', 'org_id']) print "Count: ", l_history.count() l_history.printSchema()
La sortie est la suivante :
Count: 10439
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
| |-- element: struct
| | |-- note: string
| | |-- name: string
| | |-- lang: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- death_date: string
|-- legislative_period_id: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- image: string
|-- given_name: string
|-- family_name: string
|-- id: string
|-- start_date: string
|-- end_date: string
Vous disposez à présent de la table finale que vous pouvez utiliser pour l'analyse. Vous pouvez l'écrire dans un format compact et efficace pour les analyses, notamment Parquet, sur lesquelles vous pouvez exécuter SQL dans AWS Glue, Amazon Athena ou Amazon Redshift Spectrum.
L'appel suivant enregistre la table dans plusieurs fichiers afin de prendre en charge des lectures parallèles rapides lors de l'exécution d'une analyse ultérieure :
glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"}, format = "parquet")
Pour réunir toutes les données d'historique en un seul fichier, vous devez les convertir dans un cadre de données, les repartitionner et les écrire :
s_history = l_history.toDF().repartition(1) s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')
Sinon, si vous souhaitez les distinguer en fonction du Sénat et de la Chambre des représentants :
l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part', partitionBy=['org_name'])
Étape 6 : Transformer les données pour les bases de données relationnelles
AWS Glue permet d'écrire les données dans des bases de données relationnelles comme Amazon Redshift, même avec des données semi-structurées. Ainsi, vous obtenez une relationalize
de transformation, qui aplatit DynamicFrames
, quelle que soit la complexité des objets du cadre.
Dans cet exemple, à l'aide de l_history
DynamicFrame
, transmettez le nom d'une table racine (hist_root
) et un chemin de travail temporaire relationalize
. Cela renvoie une DynamicFrameCollection
. Vous pouvez ensuite répertorier les noms des DynamicFrames
de cet ensemble :
dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/") dfc.keys()
Voici la sortie de l'appel keys
:
[u'hist_root', u'hist_root_contact_details', u'hist_root_links', u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']
Relationalize
a décomposé la table d'historique en six nouvelles tables : une table racine qui contient un enregistrement pour chaque objet dans le DynamicFrame
et des tables auxiliaires pour les tableaux. La gestion de tableaux dans les bases de données relationnelles est souvent sous-optimale, en particulier lorsque ces tableaux deviennent volumineux. Décomposer les tableaux en différentes tables permet d'accélérer considérablement les requêtes.
Ensuite, consultez la décomposition en examinant contact_details
:
l_history.select_fields('contact_details').printSchema() dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()
Voici la sortie de l'appel show
:
root
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 10| 0| fax| |
| 10| 1| | 202-225-1314|
| 10| 2| phone| |
| 10| 3| | 202-225-3772|
| 10| 4| twitter| |
| 10| 5| | MikeRossUpdates|
| 75| 0| fax| |
| 75| 1| | 202-225-7856|
| 75| 2| phone| |
| 75| 3| | 202-225-2711|
| 75| 4| twitter| |
| 75| 5| | SenCapito|
+---+-----+------------------------+-------------------------+
Le champ contact_details
était un tableau de structures dans le DynamicFrame
d'origine. Chaque élément de ces tableaux est une ligne distincte dans la table auxiliaire, indexée par index
. L'id
ici est une clé étrangère dans la table hist_root
avec la clé contact_details
:
dfc.select('hist_root').toDF().where( "contact_details = 10 or contact_details = 75").select( ['id', 'given_name', 'family_name', 'contact_details']).show()
En voici la sortie :
+--------------------+----------+-----------+---------------+
| id|given_name|family_name|contact_details|
+--------------------+----------+-----------+---------------+
|f4fc30ee-7b42-432...| Mike| Ross| 10|
|e3c60f34-7d1b-4c0...| Shelley| Capito| 75|
+--------------------+----------+-----------+---------------+
Notez dans ces commandes que toDF()
, puis une expression where
sont utilisés pour filtrer les lignes à afficher.
Ainsi, la jonction de la table hist_root
avec les tables auxiliaire vous permet d'effectuer les actions suivantes :
Charger des données dans les bases de données sans prise en charge par un tableau ;
Interroger chaque élément individuel dans un tableau à l'aide de SQL.
Stockez et accédez en toute sécurité à vos informations d'identification Amazon Redshift avec une connexion AWS Glue. Pour plus d'informations sur la création de votre propre connexion, consultez Connexion aux données.
Vous êtes maintenant prêt à écrire vos données dans une connexion en parcourant le DynamicFrames
un par un :
for df_name in dfc.keys(): m_df = dfc.select(df_name) print "Writing to table: ", df_name glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df,
connection settings here
)
Les paramètres de connexion varient en fonction de votre type de base de données relationnelle :
-
Pour obtenir des instructions sur l'écriture sur Amazon Redshift, consultez Connexions Redshift.
-
Pour d'autres bases de données, consultez Types et options de connexion pour ETL dans AWS Glue pour Spark.
Conclusion
En général, AWS Glue est très flexible. Il vous permet de réaliser, en quelques lignes de code, ce qui prendrait normalement plusieurs jours pour écrire. Vous pouvez trouver l'intégralité des scripts source-to-target ETL dans le fichier Python join_and_relationalize.py
dans les AWS Glueexemples