Exemplo de código: juntar e relacionar dados - AWS Glue

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Exemplo de código: juntar e relacionar dados

Este exemplo usa um conjunto de dados obtidos por download no site http://everypolitician.org/ para o bucket sample-dataset no Amazon Simple Storage Service (Amazon S3): s3://awsglue-datasets/examples/us-legislators/all. O conjunto de dados contém dados no formato JSON sobre os legisladores dos Estados Unidos e os assentos que eles ocuparam na Câmara dos Deputados e no Senado Americano e foi ligeiramente modificado e disponibilizado em um bucket público do Amazon S3 para a finalidade deste tutorial.

Você pode encontrar o código-fonte para este exemplo no arquivo join_and_relationalize.py no repositório de exemplos do AWS Glue no site do GitHub.

Usando esses dados, você realizará os procedimentos a seguir:

  • Use um crawler do AWS Glue para classificar objetos armazenados em um bucket público do Amazon S3 e salvar seus esquemas no catálogo de dados do Glue da AWS.

  • Examine os metadados e esquemas da tabela resultantes do rastreamento.

  • Escreva um script Python de extração, transferência e carregamento (ETL) que usa os metadados no Data Catalog para fazer o seguinte:

    • Juntar dados nos diferentes arquivos de fonte em uma única tabela de dados (ou seja, desnormalizar os dados).

    • Filtrar a tabela reunida em tabelas separadas por tipo de legislador.

    • Gravar dados resultantes em arquivos Apache Parquet separados para análise posterior.

A forma preferida de depurar scripts do Python ou do PySpark durante a execução na AWS é usar cadernos no AWS Glue Studio.

Etapa 1: crawling de dados no bucket do Amazon S3

  1. Faça login no AWS Management Console e abra o console do AWS Glue em https://console.aws.amazon.com/glue/.

  2. Siga as etapas em Configurar um crawler para criar um novo crawler que possa fazer crawling no conjunto de dados s3://awsglue-datasets/examples/us-legislators/all em um banco de dados nomeado legislators no AWS Glue Data Catalog. Os dados de exemplo já estão nesse bucket público do Amazon S3.

  3. Execute o novo crawler e, em seguida, verifique o banco de dados legislators.

    O crawler cria as seguintes tabelas de metadados:

    • persons_json

    • memberships_json

    • organizations_json

    • events_json

    • areas_json

    • countries_r_json

    Esta é uma coleção seminormalizada de tabelas que contêm legisladores e suas histórias.

Etapa 2: adicionar o script de boilerplate ao caderno do endpoint de desenvolvimento

Cole o seguinte script de boilerplate no caderno do endpoint de desenvolvimento para importar as bibliotecas do AWS Glue necessárias e configure um único GlueContext:

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())

Etapa 3: examinar os esquemas dos dados no Data Catalog

Em seguida, você pode facilmente criar e examinar um DynamicFrame a partir do AWS Glue Data Catalog e examinar os esquemas dos dados. Por exemplo, para ver o esquema da tabela persons_json, adicione o seguinte ao seu caderno:

persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json") print "Count: ", persons.count() persons.printSchema()

Veja a seguir a saída das chamadas de impressão:

Count: 1961 root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string

Cada pessoa na tabela é membro de algum órgão do Congresso dos EUA.

Para visualizar o esquema da tabela memberships_json, digite o seguinte:

memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json") print "Count: ", memberships.count() memberships.printSchema()

A saída é a seguinte:

Count: 10439 root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string

As organizations são partidos e as duas Câmaras do Congresso, o Senado e a Câmara dos Deputados. Para visualizar o esquema da tabela organizations_json, digite o seguinte:

orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json") print "Count: ", orgs.count() orgs.printSchema()

A saída é a seguinte:

Count: 13 root |-- classification: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- name: string |-- seats: int |-- type: string

Etapa 4: filtrar os dados

Em seguida, mantenha apenas os campos desejados e renomeie id como org_id. O conjunto de dados é pequeno o suficiente para que você possa vê-lo por completo.

O toDF() converte um DynamicFrame em um DataFrame do Apache Spark, para que você possa aplicar as transformações já existentes no Apache Spark SQL:

orgs = orgs.drop_fields(['other_names', 'identifiers']).rename_field( 'id', 'org_id').rename_field( 'name', 'org_name') orgs.toDF().show()

Veja a saída a seguir:

+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ |classification| org_id| org_name| links|seats| type| image| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+ | party| party/al| AL| null| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party|party/democrat-li...| Democrat-Liberal|[[website,http://...| null| null| null| | legislature|d56acebe-8fdc-47b...|House of Represen...| null| 435|lower house| null| | party| party/independent| Independent| null| null| null| null| | party|party/new_progres...| New Progressive|[[website,http://...| null| null|https://upload.wi...| | party|party/popular_dem...| Popular Democrat|[[website,http://...| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | party|party/republican-...|Republican-Conser...|[[website,http://...| null| null| null| | party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...| | party| party/independent| Independent| null| null| null| null| | party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...| | legislature|8fa6c3d2-71dc-478...| Senate| null| 100|upper house| null| +--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+

Digite o seguinte para visualizar organizations que aparecem em memberships:

memberships.select_fields(['organization_id']).toDF().distinct().show()

Veja a saída a seguir:

+--------------------+ | organization_id| +--------------------+ |d56acebe-8fdc-47b...| |8fa6c3d2-71dc-478...| +--------------------+

Etapa 5: integrar tudo

Agora, use o AWS Glue para juntar essas tabelas relacionais e criar uma tabela de histórico completa do legislador memberships e suas correspondentes organizations.

  1. Primeiro, junte persons e memberships em id e person_id.

  2. Em seguida, junte o resultado com orgs em org_id e organization_id.

  3. Coloque os campos redundantes person_id e org_id.

Você pode fazer todas essas operações em uma linha de código (extensa):

l_history = Join.apply(orgs, Join.apply(persons, memberships, 'id', 'person_id'), 'org_id', 'organization_id').drop_fields(['person_id', 'org_id']) print "Count: ", l_history.count() l_history.printSchema()

A saída é a seguinte:

Count: 10439 root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- death_date: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- family_name: string |-- id: string |-- start_date: string |-- end_date: string

Agora você tem a tabela final que pode usar para análise. Você pode gravar em um formato compacto e eficiente para análise (ou seja, Parquet) que permita a execução de SQL no AWS Glue, no Amazon Athena ou no Amazon Redshift Spectrum.

A seguinte chamada grava a tabela em vários arquivos para suportar leituras paralelas rápidas ao fazer a análise posteriormente:

glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"}, format = "parquet")

Para colocar todos os dados do histórico em um único arquivo, você precisa convertê-los em um quadro de dados, reparticioná-los e gravá-los:

s_history = l_history.toDF().repartition(1) s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')

Se preferir, você pode separá-los por "Senado" e "Câmara":

l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part', partitionBy=['org_name'])

Etapa 6: transformar os dados para bancos de dados relacionais

O AWS Glue facilita a gravação dos dados em bancos de dados relacionais, como o Amazon RedShift, mesmo com dados semiestruturados. Ele oferece uma transformação relationalize, que nivela DynamicFrames independentemente da complexidade dos objetos no quadro.

Usando o l_history DynamicFrame neste exemplo, transmita o nome da tabela raiz (hist_root) e um caminho de trabalho temporário para relationalize. Isso retorna um DynamicFrameCollection. Você pode então os nomes dos DynamicFrames nessa coleção:

dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/") dfc.keys()

Veja a seguir a saída da chamada keys:

[u'hist_root', u'hist_root_contact_details', u'hist_root_links', u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']

Relationalize quebrou a tabela de histórico em seis novas tabelas: uma tabela raiz que contém um registro para cada objeto no DynamicFrame, e tabelas auxiliares para as matrizes. O gerenciamento de matrizes em bancos de dados relacionais geralmente é pouco satisfatório, principalmente porque essas matrizes ficam grandes. Separar as matrizes em diferentes tabelas torna as consultas muito mais rápidas.

Em seguida, verifique a separação examinando contact_details:

l_history.select_fields('contact_details').printSchema() dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()

Veja a seguir a saída da chamada show:

root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| | | 10| 1| | 202-225-1314| | 10| 2| phone| | | 10| 3| | 202-225-3772| | 10| 4| twitter| | | 10| 5| | MikeRossUpdates| | 75| 0| fax| | | 75| 1| | 202-225-7856| | 75| 2| phone| | | 75| 3| | 202-225-2711| | 75| 4| twitter| | | 75| 5| | SenCapito| +---+-----+------------------------+-------------------------+

O campo contact_details era uma matriz de estruturas no DynamicFrame original. Cada elemento dessas matrizes é uma linha separada na tabela auxiliar, indexada por index. Aqui, id é uma chave externa na tabela hist_root com a chave contact_details:

dfc.select('hist_root').toDF().where( "contact_details = 10 or contact_details = 75").select( ['id', 'given_name', 'family_name', 'contact_details']).show()

Esta é a saída:

+--------------------+----------+-----------+---------------+ | id|given_name|family_name|contact_details| +--------------------+----------+-----------+---------------+ |f4fc30ee-7b42-432...| Mike| Ross| 10| |e3c60f34-7d1b-4c0...| Shelley| Capito| 75| +--------------------+----------+-----------+---------------+

Nestes comandos, as expressões toDF() e where são usadas ​​para filtrar as linhas que você quer ver.

Portanto, ao juntar a tabela hist_root e as tabelas auxiliares, você pode fazer o seguinte:

  • Carregar dados em bancos de dados sem suporte de matriz.

  • Consultar cada item individual em uma matriz usando o SQL.

Armazene e acesse com segurança suas credenciais do Amazon Redshift com uma conexão do AWS Glue. Para obter informações sobre como criar suas próprias conexões, consulte Conectar a dados.

Agora está tudo pronto para gravar seus dados em uma conexão alternando por um DynamicFrames de cada vez:

for df_name in dfc.keys(): m_df = dfc.select(df_name) print "Writing to table: ", df_name glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, connection settings here)

Suas configurações de conexão serão diferentes de acordo com o tipo de banco de dados relacional:

Conclusão

De modo geral, o AWS Glue é muito flexível. Com ele, você usa algumas linhas de código para escrever o que normalmente levaria dias. Você pode encontrar todos os scripts de ETL de fonte para destino no arquivo Python join_and_relationalize.py nos exemplos do AWS Glue no GitHub.