Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Esempio di codice: unione e relazioni dei dati
In questo esempio viene usato un set di dati scaricato da http://everypolitician.org/sample-dataset
in Amazon Simple Storage Service (Amazon S3): s3://awsglue-datasets/examples/us-legislators/all
. Il set di dati contiene i dati in formato JSON sui legislatori degli Stati Uniti e sui seggi che hanno occupato nella Camera dei rappresentanti e al Senato che sono stati modificati leggermente e resi disponibili in un bucket Amazon S3 pubblico a fini di questo tutorial.
È possibile trovare il codice sorgente di questo esempio nel join_and_relationalize.py
file dell'archivio degli AWS Glue esempi
L'esercitazione illustra con questi dati come:
Usa un AWS Glue crawler per classificare gli oggetti archiviati in un bucket Amazon S3 pubblico e salvare i relativi schemi nel Glue Data Catalog. AWS
Esaminare gli schemi e i metadati della tabella restituiti dal crawling.
-
Scrivere uno script di estrazione, trasferimento e caricamento (ETL) Python che usa i metadati del catalogo dati per:
Unire insieme i dati dei diversi file di origine in un'unica tabella di dati (ovvero denormalizzare i dati).
Filtrare la tabella unita in tabelle separate in base al tipo di legislatore.
Scrivere i dati risultanti per separare i file di Apache Parquet per analisi successive.
Il modo preferito per eseguire il debug di Python PySpark o degli script durante l'esecuzione consiste nell'utilizzare AWS Notebooks su Glue Studio. AWS
Fase 1: esecuzione del crawling sui dati nel bucket Amazon S3
-
Seguendo i passaggi descrittiConfigurazione di un crawler, crea un nuovo crawler in grado di eseguire la scansione del
s3://awsglue-datasets/examples/us-legislators/all
set di dati in un database denominato nel AWS Glue Datalegislators
Catalog. I dati di esempio sono già in questo bucket Amazon S3 pubblico. -
Esegui il nuovo crawler e controlla il database
legislators
.Il crawler crea le seguenti tabelle di metadati:
-
persons_json
-
memberships_json
-
organizations_json
-
events_json
-
areas_json
-
countries_r_json
Si tratta di una raccolta di tabelle semi-normalizzata contenenti i legislatori e le relative storie.
-
Fase 2: aggiunta dello script Boilerplate al notebook degli endpoint di sviluppo
Incolla lo script boilerplate seguente nel notebook degli endpoint di sviluppo per importare le librerie AWS Glue necessarie e configurare un singolo oggetto GlueContext
:
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())
Fase 3: esame degli schemi dai dati nel catalogo dati
Successivamente, puoi facilmente creare examine a DynamicFrame dal AWS Glue Data Catalog ed esaminare gli schemi dei dati. Ad esempio, per visualizzare lo schema della tabella persons_json
, aggiungi quanto segue nel notebook:
persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json") print "Count: ", persons.count() persons.printSchema()
Ecco l'output dalle chiamate di stampa:
Count: 1961
root
|-- family_name: string
|-- name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- note: string
| | |-- name: string
| | |-- lang: string
|-- sort_name: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- death_date: string
Ogni persona nella tabella è membro di alcuni enti del Congresso degli Stati Uniti.
Per visualizzare lo schema della tabella memberships_json
, digita quando segue:
memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json") print "Count: ", memberships.count() memberships.printSchema()
L'output è il seguente:
Count: 10439
root
|-- area_id: string
|-- on_behalf_of_id: string
|-- organization_id: string
|-- role: string
|-- person_id: string
|-- legislative_period_id: string
|-- start_date: string
|-- end_date: string
Gli elementi organizations
sono i partiti e le due camere del Congresso, il Senato e la Camera dei rappresentanti. Per visualizzare lo schema della tabella organizations_json
, digita quando segue:
orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json") print "Count: ", orgs.count() orgs.printSchema()
L'output è il seguente:
Count: 13
root
|-- classification: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- image: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- lang: string
| | |-- note: string
| | |-- name: string
|-- id: string
|-- name: string
|-- seats: int
|-- type: string
Fase 4: filtrare i dati
A questo punto mantieni solo i campi che desideri e rinomina id
in org_id
. Il set di dati è sufficientemente piccolo da poterlo visualizzare tutto insieme.
L'elemento toDF()
converte un oggetto DynamicFrame
in un elemento DataFrame
di Apache Spark in modo da poter applicare le trasformazioni già esistenti in Apache Spark SQL:
orgs = orgs.drop_fields(['other_names', 'identifiers']).rename_field( 'id', 'org_id').rename_field( 'name', 'org_name') orgs.toDF().show()
Di seguito è riportato l'output:
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|classification| org_id| org_name| links|seats| type| image|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
| party| party/al| AL| null| null| null| null|
| party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...|
| party|party/democrat-li...| Democrat-Liberal|[[website,http://...| null| null| null|
| legislature|d56acebe-8fdc-47b...|House of Represen...| null| 435|lower house| null|
| party| party/independent| Independent| null| null| null| null|
| party|party/new_progres...| New Progressive|[[website,http://...| null| null|https://upload.wi...|
| party|party/popular_dem...| Popular Democrat|[[website,http://...| null| null| null|
| party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...|
| party|party/republican-...|Republican-Conser...|[[website,http://...| null| null| null|
| party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...|
| party| party/independent| Independent| null| null| null| null|
| party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...|
| legislature|8fa6c3d2-71dc-478...| Senate| null| 100|upper house| null|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
Digita quanto segue per visualizzare gli elementi organizations
presenti nell'oggetto memberships
:
memberships.select_fields(['organization_id']).toDF().distinct().show()
Di seguito è riportato l'output:
+--------------------+
| organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+
Fase 5: unione dei dati
Usa quindi AWS Glue per unire queste tabelle relazionali e creare una tabella con la cronologia completa per l'oggetto memberships
dei legislatori e i relativi elementi organizations
.
-
In primo luogo, unisci
persons
ememberships
inid
eperson_id
. -
Quindi, unisci il risultato a
orgs
inorg_id
eorganization_id
. -
Quindi, rilascia i campi ridondanti,
person_id
eorg_id
.
Puoi eseguire tutte queste operazioni in una sola riga di codice estesa:
l_history = Join.apply(orgs, Join.apply(persons, memberships, 'id', 'person_id'), 'org_id', 'organization_id').drop_fields(['person_id', 'org_id']) print "Count: ", l_history.count() l_history.printSchema()
L'output è il seguente:
Count: 10439
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
| |-- element: struct
| | |-- note: string
| | |-- name: string
| | |-- lang: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- death_date: string
|-- legislative_period_id: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- image: string
|-- given_name: string
|-- family_name: string
|-- id: string
|-- start_date: string
|-- end_date: string
Ora hai la tabella finale che puoi utilizzare per l'analisi. Puoi scrivere in un formato compatto ed efficiente per le analisi dei dati, vale a dire Parquet, che puoi usare per eseguire SQL in AWS Glue, Amazon Athena o Amazon Redshift Spectrum.
La seguente chiamata scrive la tabella in più file per supportare le operazioni di lettura parallela veloce nella fase di analisi successiva:
glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"}, format = "parquet")
Per inserire tutti i dati cronologici in un singolo file, devi convertirli in un frame di dati, suddividerlo in partizioni e scriverlo:
s_history = l_history.toDF().repartition(1) s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')
In alternativa, se vuoi separarlo dal Senato e dalla Camera:
l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part', partitionBy=['org_name'])
Fase 6: trasformare i dati per i database relazionali
AWS Glue permette di scrivere in modo semplice i dati, anche semistrutturati, in database relazionali come Amazon Redshift,. Offre una trasformazione di tipo relationalize
, che appiattisce gli elementi DynamicFrames
indipendentemente dalla complessità degli oggetti in frame.
Utilizzando l_history
DynamicFrame
in questo esempio, passi il nome di una tabella radice (hist_root
) e un percorso temporaneo a relationalize
. Viene restituito un elemento DynamicFrameCollection
. Puoi quindi elencare i nomi degli elementi DynamicFrames
nella raccolta:
dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/") dfc.keys()
Di seguito è riportato l'output della chiamata keys
:
[u'hist_root', u'hist_root_contact_details', u'hist_root_links', u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']
Relationalize
suddivide la tabella della cronologia in sei nuove tabelle: una tabella radice che contiene un record per ogni oggetto dell'elemento DynamicFrame
e le tabelle ausiliarie per le matrici. La gestione delle matrici nei database relazionali spesso non è ottimali, soprattutto quando le matrici diventano grandi. Separando le matrici in tabelle diverse velocizza l'esecuzione delle query.
A questo punto, controlla la separazione esaminando contact_details
:
l_history.select_fields('contact_details').printSchema() dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()
Di seguito è riportato l'output della chiamata show
:
root
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 10| 0| fax| |
| 10| 1| | 202-225-1314|
| 10| 2| phone| |
| 10| 3| | 202-225-3772|
| 10| 4| twitter| |
| 10| 5| | MikeRossUpdates|
| 75| 0| fax| |
| 75| 1| | 202-225-7856|
| 75| 2| phone| |
| 75| 3| | 202-225-2711|
| 75| 4| twitter| |
| 75| 5| | SenCapito|
+---+-----+------------------------+-------------------------+
Il campo contact_details
era una matrice di strutture nell'elemento DynamicFrame
originale. Ogni elemento di tali matrici è una riga separata nella tabella ausiliaria, indicizzata da index
. L'id
qui è una chiave esterna nella tabella hist_root
con la chiave contact_details
:
dfc.select('hist_root').toDF().where( "contact_details = 10 or contact_details = 75").select( ['id', 'given_name', 'family_name', 'contact_details']).show()
Di seguito è riportato l'output:
+--------------------+----------+-----------+---------------+
| id|given_name|family_name|contact_details|
+--------------------+----------+-----------+---------------+
|f4fc30ee-7b42-432...| Mike| Ross| 10|
|e3c60f34-7d1b-4c0...| Shelley| Capito| 75|
+--------------------+----------+-----------+---------------+
In questi comandi vengono utilizzati toDF()
e un'espressione where
per filtrare le righe che vuoi vedere.
Quindi, unendo la tabella hist_root
con le tabelle ausiliarie ti consente di effettuare le operazioni descritte di seguito.
Caricare i dati nei database senza il supporto di matrici.
Eseguire la query di ogni singolo elemento in una matrice con SQL.
Archivia e accedi in modo sicuro alle tue credenziali Amazon Redshift con una connessione AWS Glue. Per informazioni su come creare la tua connessione, vedi Connessione ai dati.
Siete ora pronti a scrivere i vostri dati su una connessione, scorrendo uno alla volta i DynamicFrames
:
for df_name in dfc.keys(): m_df = dfc.select(df_name) print "Writing to table: ", df_name glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df,
connection settings here
)
Le impostazioni di connessione variano in base al tipo di database relazionale:
-
Per istruzioni su come scrivere su Amazon Redshift, consultare Connessioni Redshift.
-
Per altri database, consultare Tipi e opzioni di connessione per ETL in AWS Glue per Spark.
Conclusioni
Complessivamente, AWS Glue è molto flessibile. Con poche righe di codice, ti consente di eseguire ciò che normalmente ti potrebbe richiedere giorni di scrittura. Puoi trovare tutti gli script source-to-target ETL nel file Python join_and_relationalize.py
negli esempi su. AWS Glue