Esempio di codice: unione e relazioni dei dati - AWS Glue

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Esempio di codice: unione e relazioni dei dati

In questo esempio viene usato un set di dati scaricato da http://everypolitician.org/ nel bucket sample-dataset in Amazon Simple Storage Service (Amazon S3): s3://awsglue-datasets/examples/us-legislators/all. Il set di dati contiene i dati in formato JSON sui legislatori degli Stati Uniti e sui seggi che hanno occupato nella Camera dei rappresentanti e al Senato che sono stati modificati leggermente e resi disponibili in un bucket Amazon S3 pubblico a fini di questo tutorial.

È possibile trovare il codice sorgente di questo esempio nel join_and_relationalize.py file dell'archivio degli AWS Glue esempi sul GitHub sito Web.

L'esercitazione illustra con questi dati come:

  • Usa un AWS Glue crawler per classificare gli oggetti archiviati in un bucket Amazon S3 pubblico e salvare i relativi schemi nel Glue Data Catalog. AWS

  • Esaminare gli schemi e i metadati della tabella restituiti dal crawling.

  • Scrivere uno script di estrazione, trasferimento e caricamento (ETL) Python che usa i metadati del catalogo dati per:

    • Unire insieme i dati dei diversi file di origine in un'unica tabella di dati (ovvero denormalizzare i dati).

    • Filtrare la tabella unita in tabelle separate in base al tipo di legislatore.

    • Scrivere i dati risultanti per separare i file di Apache Parquet per analisi successive.

Il modo preferito per eseguire il debug di Python PySpark o degli script durante l'esecuzione consiste nell'utilizzare AWS Notebooks su Glue Studio. AWS

Fase 1: esecuzione del crawling sui dati nel bucket Amazon S3

  1. Accedi a e apri la console all' AWS Management Console indirizzo https://console.aws.amazon.com/glue/. AWS Glue

  2. Seguendo i passaggi descrittiConfigurazione di un crawler, crea un nuovo crawler in grado di eseguire la scansione del s3://awsglue-datasets/examples/us-legislators/all set di dati in un database denominato nel AWS Glue Data legislators Catalog. I dati di esempio sono già in questo bucket Amazon S3 pubblico.

  3. Esegui il nuovo crawler e controlla il database legislators.

    Il crawler crea le seguenti tabelle di metadati:

    • persons_json

    • memberships_json

    • organizations_json

    • events_json

    • areas_json

    • countries_r_json

    Si tratta di una raccolta di tabelle semi-normalizzata contenenti i legislatori e le relative storie.

Fase 2: aggiunta dello script Boilerplate al notebook degli endpoint di sviluppo

Incolla lo script boilerplate seguente nel notebook degli endpoint di sviluppo per importare le librerie AWS Glue necessarie e configurare un singolo oggetto GlueContext:

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())

Fase 3: esame degli schemi dai dati nel catalogo dati

Successivamente, puoi facilmente creare examine a DynamicFrame dal AWS Glue Data Catalog ed esaminare gli schemi dei dati. Ad esempio, per visualizzare lo schema della tabella persons_json, aggiungi quanto segue nel notebook:

persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json") print "Count: ", persons.count() persons.printSchema()

Ecco l'output dalle chiamate di stampa:

Count: 1961 root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string

Ogni persona nella tabella è membro di alcuni enti del Congresso degli Stati Uniti.

Per visualizzare lo schema della tabella memberships_json, digita quando segue:

memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json") print "Count: ", memberships.count() memberships.printSchema()

L'output è il seguente:

Count: 10439 root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string

Gli elementi organizations sono i partiti e le due camere del Congresso, il Senato e la Camera dei rappresentanti. Per visualizzare lo schema della tabella organizations_json, digita quando segue:

orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json") print "Count: ", orgs.count() orgs.printSchema()

L'output è il seguente:

Count: 13 root |-- classification: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- name: string |-- seats: int |-- type: string

Fase 4: filtrare i dati

A questo punto mantieni solo i campi che desideri e rinomina id in org_id. Il set di dati è sufficientemente piccolo da poterlo visualizzare tutto insieme.

L'elemento toDF() converte un oggetto DynamicFrame in un elemento DataFrame di Apache Spark in modo da poter applicare le trasformazioni già esistenti in Apache Spark SQL:

orgs = orgs.drop_fields(['other_names', 'identifiers']).rename_field( 'id', 'org_id').rename_field( 'name', 'org_name') orgs.toDF().show()

Di seguito è riportato l'output:

+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ |classification| org_id| org_name| links|seats| type| image| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ | party| party/al| AL| null| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party|party/democrat-li...| Democrat-Liberal|[[website,http://...| null| null| null| | legislature|d56acebe-8fdc-47b...|House of Represen...| null| 435|lower house| null| | party| party/independent| Independent| null| null| null| null| | party|party/new_progres...| New Progressive|[[website,http://...| null| null|https://upload.wi...| | party|party/popular_dem...| Popular Democrat|[[website,http://...| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | party|party/republican-...|Republican-Conser...|[[website,http://...| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party| party/independent| Independent| null| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | legislature|8fa6c3d2-71dc-478...| Senate| null| 100|upper house| null| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+

Digita quanto segue per visualizzare gli elementi organizations presenti nell'oggetto memberships:

memberships.select_fields(['organization_id']).toDF().distinct().show()

Di seguito è riportato l'output:

+--------------------+ | organization_id| +--------------------+ |d56acebe-8fdc-47b...| |8fa6c3d2-71dc-478...| +--------------------+

Fase 5: unione dei dati

Usa quindi AWS Glue per unire queste tabelle relazionali e creare una tabella con la cronologia completa per l'oggetto memberships dei legislatori e i relativi elementi organizations.

  1. In primo luogo, unisci persons e memberships in id e person_id.

  2. Quindi, unisci il risultato a orgs in org_id e organization_id.

  3. Quindi, rilascia i campi ridondanti, person_id e org_id.

Puoi eseguire tutte queste operazioni in una sola riga di codice estesa:

l_history = Join.apply(orgs, Join.apply(persons, memberships, 'id', 'person_id'), 'org_id', 'organization_id').drop_fields(['person_id', 'org_id']) print "Count: ", l_history.count() l_history.printSchema()

L'output è il seguente:

Count: 10439 root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- death_date: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- family_name: string |-- id: string |-- start_date: string |-- end_date: string

Ora hai la tabella finale che puoi utilizzare per l'analisi. Puoi scrivere in un formato compatto ed efficiente per le analisi dei dati, vale a dire Parquet, che puoi usare per eseguire SQL in AWS Glue, Amazon Athena o Amazon Redshift Spectrum.

La seguente chiamata scrive la tabella in più file per supportare le operazioni di lettura parallela veloce nella fase di analisi successiva:

glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"}, format = "parquet")

Per inserire tutti i dati cronologici in un singolo file, devi convertirli in un frame di dati, suddividerlo in partizioni e scriverlo:

s_history = l_history.toDF().repartition(1) s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')

In alternativa, se vuoi separarlo dal Senato e dalla Camera:

l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part', partitionBy=['org_name'])

Fase 6: trasformare i dati per i database relazionali

AWS Glue permette di scrivere in modo semplice i dati, anche semistrutturati, in database relazionali come Amazon Redshift,. Offre una trasformazione di tipo relationalize, che appiattisce gli elementi DynamicFrames indipendentemente dalla complessità degli oggetti in frame.

Utilizzando l_history DynamicFrame in questo esempio, passi il nome di una tabella radice (hist_root) e un percorso temporaneo a relationalize. Viene restituito un elemento DynamicFrameCollection. Puoi quindi elencare i nomi degli elementi DynamicFrames nella raccolta:

dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/") dfc.keys()

Di seguito è riportato l'output della chiamata keys:

[u'hist_root', u'hist_root_contact_details', u'hist_root_links', u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']

Relationalize suddivide la tabella della cronologia in sei nuove tabelle: una tabella radice che contiene un record per ogni oggetto dell'elemento DynamicFrame e le tabelle ausiliarie per le matrici. La gestione delle matrici nei database relazionali spesso non è ottimali, soprattutto quando le matrici diventano grandi. Separando le matrici in tabelle diverse velocizza l'esecuzione delle query.

A questo punto, controlla la separazione esaminando contact_details:

l_history.select_fields('contact_details').printSchema() dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()

Di seguito è riportato l'output della chiamata show:

root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| | | 10| 1| | 202-225-1314| | 10| 2| phone| | | 10| 3| | 202-225-3772| | 10| 4| twitter| | | 10| 5| | MikeRossUpdates| | 75| 0| fax| | | 75| 1| | 202-225-7856| | 75| 2| phone| | | 75| 3| | 202-225-2711| | 75| 4| twitter| | | 75| 5| | SenCapito| +---+-----+------------------------+-------------------------+

Il campo contact_details era una matrice di strutture nell'elemento DynamicFrame originale. Ogni elemento di tali matrici è una riga separata nella tabella ausiliaria, indicizzata da index. L'id qui è una chiave esterna nella tabella hist_root con la chiave contact_details:

dfc.select('hist_root').toDF().where( "contact_details = 10 or contact_details = 75").select( ['id', 'given_name', 'family_name', 'contact_details']).show()

Di seguito è riportato l'output:

+--------------------+----------+-----------+---------------+ | id|given_name|family_name|contact_details| +--------------------+----------+-----------+---------------+ |f4fc30ee-7b42-432...| Mike| Ross| 10| |e3c60f34-7d1b-4c0...| Shelley| Capito| 75| +--------------------+----------+-----------+---------------+

In questi comandi vengono utilizzati toDF() e un'espressione where per filtrare le righe che vuoi vedere.

Quindi, unendo la tabella hist_root con le tabelle ausiliarie ti consente di effettuare le operazioni descritte di seguito.

  • Caricare i dati nei database senza il supporto di matrici.

  • Eseguire la query di ogni singolo elemento in una matrice con SQL.

Archivia e accedi in modo sicuro alle tue credenziali Amazon Redshift con una connessione AWS Glue. Per informazioni su come creare la tua connessione, vedi Connessione ai dati.

Siete ora pronti a scrivere i vostri dati su una connessione, scorrendo uno alla volta i DynamicFrames:

for df_name in dfc.keys(): m_df = dfc.select(df_name) print "Writing to table: ", df_name glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, connection settings here)

Le impostazioni di connessione variano in base al tipo di database relazionale:

Conclusioni

Complessivamente, AWS Glue è molto flessibile. Con poche righe di codice, ti consente di eseguire ciò che normalmente ti potrebbe richiedere giorni di scrittura. Puoi trovare tutti gli script source-to-target ETL nel file Python join_and_relationalize.py negli esempi su. AWS Glue GitHub