Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Código de ejemplo: unión de los datos y establecimiento de relaciones entre ellos
En este ejemplo se utiliza un conjunto de datos que se ha descargado desde http://everypolitician.org/sample-dataset
de Amazon Simple Storage Service (Amazon S3): s3://awsglue-datasets/examples/us-legislators/all
. El conjunto de datos contiene información en formato JSON sobre legisladores de los Estados Unidos y los escaños de los que han sido titulares en la Cámara de Representantes y el Senado. Para realizar este tutorial, este conjunto de datos se ha modificado ligeramente y está disponible en un bucket público de Amazon S3.
Puede encontrar el código fuente de este ejemplo en el archivo join_and_relationalize.py
del repositorio de ejemplos de AWS Glue
Con estos datos, el presente tutorial le enseña a realizar las siguientes tareas:
Utilice un rastreador de AWS Glue para clasificar los objetos que están almacenados en un bucket público de Amazon S3 y guardar sus esquemas en el Catálogo de datos de AWS Glue.
Examinar los metadatos y los esquemas de la tabla que se obtienen a partir del rastreo.
-
Escriba un script de extracción, transferencia y carga (ETL) de Python que utilice los metadatos del Catálogo de datos para hacer lo siguiente:
Unir los datos de los diferentes archivos de origen juntos en una única tabla de datos (es decir, desnormalizar los datos).
Desglosar la tabla unida en diferentes tablas según el tipo de legislador.
Escribir los datos resultantes en archivos Apache Parquet independientes para realizar un análisis posteriormente.
La forma preferida de depurar scripts de Python o PySpark mientras se están ejecutando en AWS es usar Notebooks en Glue Studio AWS.
Paso 1: Rastrear los datos del bucket de Amazon S3
-
Luego, inicie sesión en la AWS Management Console y abra la consola de AWS Glue en https://console.aws.amazon.com/glue/
. -
En función de los pasos que se indican en Configuración de rastreadores, cree un nuevo rastreador que pueda rastrear el conjunto de datos
s3://awsglue-datasets/examples/us-legislators/all
en una base de datos denominadalegislators
en el Catálogo de datos de AWS Glue. Los datos de ejemplo ya están en este bucket público de Amazon S3. -
Ejecute el nuevo rastreador y, a continuación, compruebe la base de datos
legislators
.El rastreador crea las siguientes tablas de metadatos:
-
persons_json
-
memberships_json
-
organizations_json
-
events_json
-
areas_json
-
countries_r_json
Esta es una colección seminormalizada de tablas que contienen legisladores y sus historias.
-
Paso 2: Añadir un script reutilizable al cuaderno del punto de conexión de desarrollo
Pegue el script reutilizable siguiente en el cuaderno del punto de enlace de desarrollo para importar las bibliotecas de AWS Glue que necesite y configurar un único GlueContext
:
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())
Paso 3: Examinar los esquemas de los datos del Catálogo de datos
A continuación, puede crear un DynamicFrame con facilidad desde el Catálogo de datos de AWS Glue y examinar los esquemas de los datos. Por ejemplo, para ver el esquema de la tabla persons_json
, añada lo siguiente en su cuaderno:
persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json") print "Count: ", persons.count() persons.printSchema()
Esta es la salida de las llamadas impresas:
Count: 1961
root
|-- family_name: string
|-- name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- note: string
| | |-- name: string
| | |-- lang: string
|-- sort_name: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- death_date: string
Cada persona de la tabla es miembro de algún órgano del congreso de los Estados Unidos.
Para ver el esquema de la tabla memberships_json
, escriba lo siguiente:
memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json") print "Count: ", memberships.count() memberships.printSchema()
La salida es la siguiente:
Count: 10439
root
|-- area_id: string
|-- on_behalf_of_id: string
|-- organization_id: string
|-- role: string
|-- person_id: string
|-- legislative_period_id: string
|-- start_date: string
|-- end_date: string
Las organizations
son partes y las dos cámaras del Congreso, el Senado y la Cámara de Representantes. Para ver el esquema de la tabla organizations_json
, escriba lo siguiente:
orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json") print "Count: ", orgs.count() orgs.printSchema()
La salida es la siguiente:
Count: 13
root
|-- classification: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- image: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- lang: string
| | |-- note: string
| | |-- name: string
|-- id: string
|-- name: string
|-- seats: int
|-- type: string
Paso 4: Filtrar los datos
A continuación, mantenga solo los campos que desee y cambie el nombre id
por org_id
. El conjunto de datos es lo suficientemente pequeño para ver todo el conjunto.
toDF()
convierte a DynamicFrame
en un elemento DataFrame
de Apache Spark, por lo que puede aplicar las transformaciones que ya existen en Apache Spark SQL:
orgs = orgs.drop_fields(['other_names', 'identifiers']).rename_field( 'id', 'org_id').rename_field( 'name', 'org_name') orgs.toDF().show()
El ejemplo siguiente muestra la salida:
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|classification| org_id| org_name| links|seats| type| image|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
| party| party/al| AL| null| null| null| null|
| party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...|
| party|party/democrat-li...| Democrat-Liberal|[[website,http://...| null| null| null|
| legislature|d56acebe-8fdc-47b...|House of Represen...| null| 435|lower house| null|
| party| party/independent| Independent| null| null| null| null|
| party|party/new_progres...| New Progressive|[[website,http://...| null| null|https://upload.wi...|
| party|party/popular_dem...| Popular Democrat|[[website,http://...| null| null| null|
| party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...|
| party|party/republican-...|Republican-Conser...|[[website,http://...| null| null| null|
| party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...|
| party| party/independent| Independent| null| null| null| null|
| party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...|
| legislature|8fa6c3d2-71dc-478...| Senate| null| 100|upper house| null|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
Escriba lo siguiente para ver las organizations
que aparecen en memberships
:
memberships.select_fields(['organization_id']).toDF().distinct().show()
El ejemplo siguiente muestra la salida:
+--------------------+
| organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+
Paso 5: Reunirlo todo
Ahora utilice AWS Glue para unir estas tablas relacionales y crear una tabla del historial completo de memberships
del legislador y sus organizations
correspondientes.
-
En primer lugar, una
persons
ymemberships
enid
yperson_id
. -
A continuación, una el resultado con
orgs
enorg_id
yorganization_id
. -
A continuación, anule los campos redundantes
person_id
yorg_id
.
Puede realizar todas estas operaciones en una línea de código (extendida):
l_history = Join.apply(orgs, Join.apply(persons, memberships, 'id', 'person_id'), 'org_id', 'organization_id').drop_fields(['person_id', 'org_id']) print "Count: ", l_history.count() l_history.printSchema()
La salida es la siguiente:
Count: 10439
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
| |-- element: struct
| | |-- note: string
| | |-- name: string
| | |-- lang: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- death_date: string
|-- legislative_period_id: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- image: string
|-- given_name: string
|-- family_name: string
|-- id: string
|-- start_date: string
|-- end_date: string
Ahora tiene la tabla definitiva que puede utilizar para su análisis. Puede escribirla en un formato compacto y eficiente para el análisis, por ejemplo, en Parquet, en el que puede ejecutar SQL en AWS Glue, Amazon Athena o Amazon Redshift Spectrum.
La siguiente llamada escribe la tabla en varios archivos para admitir lecturas paralelas rápidas al realizar el análisis posterior:
glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"}, format = "parquet")
Para poner todos los datos de historial en un único archivo, debe convertirlos en una estructura de datos, crear nuevas particiones y escribirlos:
s_history = l_history.toDF().repartition(1) s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')
O, si desea separarlos por Senado y Cámara:
l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part', partitionBy=['org_name'])
Paso 6: Transformar los datos para bases de datos relacionales
AWS Glue facilita la tarea de escribir los datos en bases de datos relacionales como Amazon Redshift, incluso con datos semiestructurados. Ofrece una transformación relationalize
, que aplana DynamicFrames
sea cual sea la complejidad de los objetos de la trama.
Utilizando el elemento l_history
DynamicFrame
de este ejemplo, pase el nombre de una tabla (hist_root
) y una ruta de flujo de trabajo temporal a relationalize
. Esto devuelve un elemento DynamicFrameCollection
. A continuación, puede enumerar los nombres de DynamicFrames
en esa colección:
dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/") dfc.keys()
A continuación, se muestra la salida de la llamada keys
:
[u'hist_root', u'hist_root_contact_details', u'hist_root_links', u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']
Relationalize
ha desglosado la tabla de historial en seis tablas nuevas: una tabla raíz que contiene un registro por cada objeto de DynamicFrame
y tablas auxiliares para las matrices. A menudo, la gestión de matrices en las bases de datos relacionales no tiene un nivel óptimo, en particular cuando dichas matrices se hacen más grandes. Si se separan las matrices en diferentes tablas, las consultas serán mucho más rápidas.
A continuación, céntrese en la separación examinando contact_details
:
l_history.select_fields('contact_details').printSchema() dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()
A continuación, se muestra la salida de la llamada show
:
root
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 10| 0| fax| |
| 10| 1| | 202-225-1314|
| 10| 2| phone| |
| 10| 3| | 202-225-3772|
| 10| 4| twitter| |
| 10| 5| | MikeRossUpdates|
| 75| 0| fax| |
| 75| 1| | 202-225-7856|
| 75| 2| phone| |
| 75| 3| | 202-225-2711|
| 75| 4| twitter| |
| 75| 5| | SenCapito|
+---+-----+------------------------+-------------------------+
El campo contact_details
era una matriz de estructuras en el elemento DynamicFrame
original. Cada elemento de estas matrices es una fila independiente de la tabla auxiliar, indizada por index
. Aquí el id
es una clave externa en la tabla hist_root
con la clave contact_details
:
dfc.select('hist_root').toDF().where( "contact_details = 10 or contact_details = 75").select( ['id', 'given_name', 'family_name', 'contact_details']).show()
Se genera la salida siguiente:
+--------------------+----------+-----------+---------------+
| id|given_name|family_name|contact_details|
+--------------------+----------+-----------+---------------+
|f4fc30ee-7b42-432...| Mike| Ross| 10|
|e3c60f34-7d1b-4c0...| Shelley| Capito| 75|
+--------------------+----------+-----------+---------------+
Observe en estos comandos que se utiliza toDF()
y, a continuación, una expresión where
, para filtrar y obtener las filas que desea ver.
Por lo tanto, unir la tabla hist_root
con las tablas auxiliares le permite hacer lo siguiente:
Cargar datos en bases de datos sin compatibilidad de matrices.
Consultar cada elemento individual de una matriz con SQL.
Almacene las credenciales de Amazon Redshift y acceda a ellas de forma segura con una conexión de AWS Glue. Para obtener información acerca de cómo crear su propia conexión, consulte Conexión a datos.
Ahora está preparado para escribir los datos en una conexión pasando por los elementos DynamicFrames
de uno en uno:
for df_name in dfc.keys(): m_df = dfc.select(df_name) print "Writing to table: ", df_name glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df,
connection settings here
)
La configuración de la conexión variará en función del tipo de base de datos relacional:
-
Para obtener instrucciones sobre cómo escribir en Amazon Redshift, consulte Conexiones Redshift.
-
Para otras bases de datos, consulte Tipos de conexión y opciones para ETL en AWS Glue para Spark.
Conclusión
En general, AWS Glue es muy flexible. Le permite realizar en unas cuantas líneas de código lo que normalmente tardaría varios días en escribirse. Puede encontrar todos los scripts de ETL de origen a destino en el archivo de Python join_and_relationalize.py
, en los ejemplos de AWS Glue