As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Exemplo de código: juntar e relacionar dados
Este exemplo usa um conjunto de dados obtidos por download no site http://everypolitician.org/sample-dataset
no Amazon Simple Storage Service (Amazon S3): s3://awsglue-datasets/examples/us-legislators/all
. O conjunto de dados contém dados no formato JSON sobre os legisladores dos Estados Unidos e os assentos que eles ocuparam na Câmara dos Deputados e no Senado Americano e foi ligeiramente modificado e disponibilizado em um bucket público do Amazon S3 para a finalidade deste tutorial.
Você pode encontrar o código-fonte para este exemplo no arquivo join_and_relationalize.py
no repositório de exemplos do AWS Glue
Usando esses dados, você realizará os procedimentos a seguir:
Use um crawler do AWS Glue para classificar objetos armazenados em um bucket público do Amazon S3 e salvar seus esquemas no catálogo de dados do Glue da AWS.
Examine os metadados e esquemas da tabela resultantes do rastreamento.
-
Escreva um script Python de extração, transferência e carregamento (ETL) que usa os metadados no Data Catalog para fazer o seguinte:
Juntar dados nos diferentes arquivos de fonte em uma única tabela de dados (ou seja, desnormalizar os dados).
Filtrar a tabela reunida em tabelas separadas por tipo de legislador.
Gravar dados resultantes em arquivos Apache Parquet separados para análise posterior.
Etapa 1: crawling de dados no bucket do Amazon S3
-
Faça login no AWS Management Console e abra o console do AWS Glue em https://console.aws.amazon.com/glue/
. -
Siga as etapas em Configurar um crawler para criar um novo crawler que possa fazer crawling no conjunto de dados
s3://awsglue-datasets/examples/us-legislators/all
em um banco de dados nomeadolegislators
no AWS Glue Data Catalog. Os dados de exemplo já estão nesse bucket público do Amazon S3. -
Execute o novo crawler e, em seguida, verifique o banco de dados
legislators
.O crawler cria as seguintes tabelas de metadados:
-
persons_json
-
memberships_json
-
organizations_json
-
events_json
-
areas_json
-
countries_r_json
Esta é uma coleção seminormalizada de tabelas que contêm legisladores e suas histórias.
-
Etapa 2: adicionar o script de boilerplate ao caderno do endpoint de desenvolvimento
Cole o seguinte script de boilerplate no caderno do endpoint de desenvolvimento para importar as bibliotecas do AWS Glue necessárias e configure um único GlueContext
:
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())
Etapa 3: examinar os esquemas dos dados no Data Catalog
Em seguida, você pode facilmente criar e examinar um DynamicFrame a partir do AWS Glue Data Catalog e examinar os esquemas dos dados. Por exemplo, para ver o esquema da tabela persons_json
, adicione o seguinte ao seu caderno:
persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json") print "Count: ", persons.count() persons.printSchema()
Veja a seguir a saída das chamadas de impressão:
Count: 1961
root
|-- family_name: string
|-- name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- note: string
| | |-- name: string
| | |-- lang: string
|-- sort_name: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- death_date: string
Cada pessoa na tabela é membro de algum órgão do Congresso dos EUA.
Para visualizar o esquema da tabela memberships_json
, digite o seguinte:
memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json") print "Count: ", memberships.count() memberships.printSchema()
A saída é a seguinte:
Count: 10439
root
|-- area_id: string
|-- on_behalf_of_id: string
|-- organization_id: string
|-- role: string
|-- person_id: string
|-- legislative_period_id: string
|-- start_date: string
|-- end_date: string
As organizations
são partidos e as duas Câmaras do Congresso, o Senado e a Câmara dos Deputados. Para visualizar o esquema da tabela organizations_json
, digite o seguinte:
orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json") print "Count: ", orgs.count() orgs.printSchema()
A saída é a seguinte:
Count: 13
root
|-- classification: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- image: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- lang: string
| | |-- note: string
| | |-- name: string
|-- id: string
|-- name: string
|-- seats: int
|-- type: string
Etapa 4: filtrar os dados
Em seguida, mantenha apenas os campos desejados e renomeie id
como org_id
. O conjunto de dados é pequeno o suficiente para que você possa vê-lo por completo.
O toDF()
converte um DynamicFrame
em um DataFrame
do Apache Spark, para que você possa aplicar as transformações já existentes no Apache Spark SQL:
orgs = orgs.drop_fields(['other_names', 'identifiers']).rename_field( 'id', 'org_id').rename_field( 'name', 'org_name') orgs.toDF().show()
Veja a saída a seguir:
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|classification| org_id| org_name| links|seats| type| image|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
| party| party/al| AL| null| null| null| null|
| party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...|
| party|party/democrat-li...| Democrat-Liberal|[[website,http://...| null| null| null|
| legislature|d56acebe-8fdc-47b...|House of Represen...| null| 435|lower house| null|
| party| party/independent| Independent| null| null| null| null|
| party|party/new_progres...| New Progressive|[[website,http://...| null| null|https://upload.wi...|
| party|party/popular_dem...| Popular Democrat|[[website,http://...| null| null| null|
| party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...|
| party|party/republican-...|Republican-Conser...|[[website,http://...| null| null| null|
| party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...|
| party| party/independent| Independent| null| null| null| null|
| party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...|
| legislature|8fa6c3d2-71dc-478...| Senate| null| 100|upper house| null|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
Digite o seguinte para visualizar organizations
que aparecem em memberships
:
memberships.select_fields(['organization_id']).toDF().distinct().show()
Veja a saída a seguir:
+--------------------+
| organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+
Etapa 5: integrar tudo
Agora, use o AWS Glue para juntar essas tabelas relacionais e criar uma tabela de histórico completa do legislador memberships
e suas correspondentes organizations
.
-
Primeiro, junte
persons
ememberships
emid
eperson_id
. -
Em seguida, junte o resultado com
orgs
emorg_id
eorganization_id
. -
Coloque os campos redundantes
person_id
eorg_id
.
Você pode fazer todas essas operações em uma linha de código (extensa):
l_history = Join.apply(orgs, Join.apply(persons, memberships, 'id', 'person_id'), 'org_id', 'organization_id').drop_fields(['person_id', 'org_id']) print "Count: ", l_history.count() l_history.printSchema()
A saída é a seguinte:
Count: 10439
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
| |-- element: struct
| | |-- note: string
| | |-- name: string
| | |-- lang: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- death_date: string
|-- legislative_period_id: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- image: string
|-- given_name: string
|-- family_name: string
|-- id: string
|-- start_date: string
|-- end_date: string
Agora você tem a tabela final que pode usar para análise. Você pode gravar em um formato compacto e eficiente para análise (ou seja, Parquet) que permita a execução de SQL no AWS Glue, no Amazon Athena ou no Amazon Redshift Spectrum.
A seguinte chamada grava a tabela em vários arquivos para suportar leituras paralelas rápidas ao fazer a análise posteriormente:
glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"}, format = "parquet")
Para colocar todos os dados do histórico em um único arquivo, você precisa convertê-los em um quadro de dados, reparticioná-los e gravá-los:
s_history = l_history.toDF().repartition(1) s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')
Se preferir, você pode separá-los por "Senado" e "Câmara":
l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part', partitionBy=['org_name'])
Etapa 6: transformar os dados para bancos de dados relacionais
O AWS Glue facilita a gravação dos dados em bancos de dados relacionais, como o Amazon RedShift, mesmo com dados semiestruturados. Ele oferece uma transformação relationalize
, que nivela DynamicFrames
independentemente da complexidade dos objetos no quadro.
Usando o l_history
DynamicFrame
neste exemplo, transmita o nome da tabela raiz (hist_root
) e um caminho de trabalho temporário para relationalize
. Isso retorna um DynamicFrameCollection
. Você pode então os nomes dos DynamicFrames
nessa coleção:
dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/") dfc.keys()
Veja a seguir a saída da chamada keys
:
[u'hist_root', u'hist_root_contact_details', u'hist_root_links', u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']
Relationalize
quebrou a tabela de histórico em seis novas tabelas: uma tabela raiz que contém um registro para cada objeto no DynamicFrame
, e tabelas auxiliares para as matrizes. O gerenciamento de matrizes em bancos de dados relacionais geralmente é pouco satisfatório, principalmente porque essas matrizes ficam grandes. Separar as matrizes em diferentes tabelas torna as consultas muito mais rápidas.
Em seguida, verifique a separação examinando contact_details
:
l_history.select_fields('contact_details').printSchema() dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()
Veja a seguir a saída da chamada show
:
root
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 10| 0| fax| |
| 10| 1| | 202-225-1314|
| 10| 2| phone| |
| 10| 3| | 202-225-3772|
| 10| 4| twitter| |
| 10| 5| | MikeRossUpdates|
| 75| 0| fax| |
| 75| 1| | 202-225-7856|
| 75| 2| phone| |
| 75| 3| | 202-225-2711|
| 75| 4| twitter| |
| 75| 5| | SenCapito|
+---+-----+------------------------+-------------------------+
O campo contact_details
era uma matriz de estruturas no DynamicFrame
original. Cada elemento dessas matrizes é uma linha separada na tabela auxiliar, indexada por index
. Aqui, id
é uma chave externa na tabela hist_root
com a chave contact_details
:
dfc.select('hist_root').toDF().where( "contact_details = 10 or contact_details = 75").select( ['id', 'given_name', 'family_name', 'contact_details']).show()
Esta é a saída:
+--------------------+----------+-----------+---------------+
| id|given_name|family_name|contact_details|
+--------------------+----------+-----------+---------------+
|f4fc30ee-7b42-432...| Mike| Ross| 10|
|e3c60f34-7d1b-4c0...| Shelley| Capito| 75|
+--------------------+----------+-----------+---------------+
Nestes comandos, as expressões toDF()
e where
são usadas para filtrar as linhas que você quer ver.
Portanto, ao juntar a tabela hist_root
e as tabelas auxiliares, você pode fazer o seguinte:
Carregar dados em bancos de dados sem suporte de matriz.
Consultar cada item individual em uma matriz usando o SQL.
Armazene e acesse com segurança suas credenciais do Amazon Redshift com uma conexão do AWS Glue. Para obter informações sobre como criar suas próprias conexões, consulte Conectar a dados.
Agora está tudo pronto para gravar seus dados em uma conexão alternando por um DynamicFrames
de cada vez:
for df_name in dfc.keys(): m_df = dfc.select(df_name) print "Writing to table: ", df_name glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df,
connection settings here
)
Suas configurações de conexão serão diferentes de acordo com o tipo de banco de dados relacional:
-
Para obter instruções sobre como gravar no Amazon Redshift, consulte Conexões do Redshift.
-
Para outros bancos de dados, consulte Tipos e opções de conexão para ETL no AWS Glue para Spark.
Conclusão
De modo geral, o AWS Glue é muito flexível. Com ele, você usa algumas linhas de código para escrever o que normalmente levaria dias. Você pode encontrar todos os scripts de ETL de fonte para destino no arquivo Python join_and_relationalize.py
nos exemplos do AWS Glue