本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
代码示例:对数据进行联接和关系化
本示例使用从 http://everypolitician.org/sample-dataset
存储桶的数据集:s3://awsglue-datasets/examples/us-legislators/all
。该数据集包含有关美国议员及其在美国众议院和参议院中占有的席位的数据(JSON 格式),并且已针对本教程进行了轻微修改且在公共 Amazon S3 存储桶中提供。
您可以在 GitHub 网站上 AWS Glue 示例存储库join_and_relationalize.py
文件中找到本示例的源代码。
利用此数据,本教程将介绍如何执行以下操作:
使用 AWS Glue 爬网程序对存储在公有 Amazon S3 存储桶中的对象进行分类并将其架构保存到 AWS Glue Data Catalog。
检查生成自爬网的表元数据和架构。
-
编写 Python 提取、转移和加载(ETL)脚本,该脚本使用数据目录中的元数据执行以下操作:
将不同源文件中的数据加入到单个数据表中 (即,使数据非规范化)。
按议员类型筛选已加入到单独的表中的表。
将生成的数据写入单独的 Apache Parquet 文件以供日后分析。
在 AWS 上运行时,调试 Python 或 PySpark 脚本的首选方法是在 AWS Glue Studio 上使用笔记本。
步骤 1:爬取 Amazon S3 存储桶中的数据
-
登录 AWS Management Console 并打开位于 https://console.aws.amazon.com/glue/
的 AWS Glue 控制台。 -
按照 配置爬网程序 中的步骤操作,创建可将
s3://awsglue-datasets/examples/us-legislators/all
数据集网络爬取到 AWS Glue Data Catalog 中名为legislators
的数据库的新爬网程序。示例数据已位于此公共 Amazon S3 存储桶中。 -
运行新爬网程序,然后检查
legislators
数据库。该爬网程序将创建以下元数据表:
-
persons_json
-
memberships_json
-
organizations_json
-
events_json
-
areas_json
-
countries_r_json
这是一个包含议员及其历史记录的半规范化表集合。
-
步骤 2:向开发终端节点笔记本中添加样板文件脚本
将以下样板文件脚本粘贴到开发终端节点笔记本中以导入所需的 AWS Glue 库,然后设置单个 GlueContext
:
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())
步骤 3:检查数据目录中数据的架构
接下来,您可以轻松地从 AWS Glue Data Catalog 检查 DynamicFrame,并检查数据的架构。例如,要查看 persons_json
表的架构,请在您的笔记本中添加以下内容:
persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json") print "Count: ", persons.count() persons.printSchema()
下面是来自打印调用的输出:
Count: 1961
root
|-- family_name: string
|-- name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- note: string
| | |-- name: string
| | |-- lang: string
|-- sort_name: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- death_date: string
表中的每个人都是某个美国国会机构的成员。
要查看 memberships_json
表的架构,请键入以下内容:
memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json") print "Count: ", memberships.count() memberships.printSchema()
您可以在一个 (扩展) 代码行中执行所有这些操作:
Count: 10439
root
|-- area_id: string
|-- on_behalf_of_id: string
|-- organization_id: string
|-- role: string
|-- person_id: string
|-- legislative_period_id: string
|-- start_date: string
|-- end_date: string
organizations
是美国国会的党派和两大议院,即参议院和众议院。要查看 organizations_json
表的架构,请键入以下内容:
orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json") print "Count: ", orgs.count() orgs.printSchema()
您可以在一个 (扩展) 代码行中执行所有这些操作:
Count: 13
root
|-- classification: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- image: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- lang: string
| | |-- note: string
| | |-- name: string
|-- id: string
|-- name: string
|-- seats: int
|-- type: string
步骤 4:筛选数据
接下来,仅保留您需要的字段,然后将 id
重命名为 org_id
。该数据集足够小,方便您完整查看。
toDF()
将 DynamicFrame
转换为 Apache Spark DataFrame
,因此您可以应用已存在于 Apache Spark SQL 中的转换:
orgs = orgs.drop_fields(['other_names', 'identifiers']).rename_field( 'id', 'org_id').rename_field( 'name', 'org_name') orgs.toDF().show()
下面显示了输出:
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|classification| org_id| org_name| links|seats| type| image|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
| party| party/al| AL| null| null| null| null|
| party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...|
| party|party/democrat-li...| Democrat-Liberal|[[website,http://...| null| null| null|
| legislature|d56acebe-8fdc-47b...|House of Represen...| null| 435|lower house| null|
| party| party/independent| Independent| null| null| null| null|
| party|party/new_progres...| New Progressive|[[website,http://...| null| null|https://upload.wi...|
| party|party/popular_dem...| Popular Democrat|[[website,http://...| null| null| null|
| party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...|
| party|party/republican-...|Republican-Conser...|[[website,http://...| null| null| null|
| party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...|
| party| party/independent| Independent| null| null| null| null|
| party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...|
| legislature|8fa6c3d2-71dc-478...| Senate| null| 100|upper house| null|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
键入以下内容以查看显示在 memberships
中的 organizations
:
memberships.select_fields(['organization_id']).toDF().distinct().show()
下面显示了输出:
+--------------------+
| organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+
步骤 5:整合内容
现在,使用 AWS Glue 联接这些关系表并创建一个包含议员 memberships
及其对应的 organizations
的完整历史记录表。
-
首先,联接
id
和person_id
上的persons
和memberships
。 -
接下来,将结果与
org_id
和organization_id
上的orgs
联接。 -
然后,删除多余的字段
person_id
和org_id
。
您可以在一个 (扩展) 代码行中执行所有这些操作:
l_history = Join.apply(orgs, Join.apply(persons, memberships, 'id', 'person_id'), 'org_id', 'organization_id').drop_fields(['person_id', 'org_id']) print "Count: ", l_history.count() l_history.printSchema()
您可以在一个 (扩展) 代码行中执行所有这些操作:
Count: 10439
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
| |-- element: struct
| | |-- note: string
| | |-- name: string
| | |-- lang: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- death_date: string
|-- legislative_period_id: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- image: string
|-- given_name: string
|-- family_name: string
|-- id: string
|-- start_date: string
|-- end_date: string
您现在有了可用于分析的最终表。您可以采用紧凑且高效的格式写入该表以供分析(即 Parquet),该格式可让您在 AWS Glue、Amazon Athena 或 Amazon Redshift Spectrum 中运行 SQL。
以下调用将跨多个文件写入该表以在日后执行分析时支持快速并行读取:
glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"}, format = "parquet")
要将所有历史记录数据放入单个文件,您必须将其转换为数据帧,为其重新分区然后写入它:
s_history = l_history.toDF().repartition(1) s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')
或者,如果您希望按参议院和众议院分隔该数据:
l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part', partitionBy=['org_name'])
步骤 6:转换关系数据库的数据
利用 AWS Glue,您可以轻松将数据写入到关系数据库(如 Amazon Redshift),甚至对半结构化数据也是如此。它提供了一种转换 relationalize
,这将展平 DynamicFrames
,无论帧中的对象的复杂度可能如何。
使用本示例中的 l_history
DynamicFrame
,传入根表的名称 (hist_root
) 和 relationalize
的临时工作路径。这将返回 DynamicFrameCollection
。您随后可以列出该集合中 DynamicFrames
的名称:
dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/") dfc.keys()
以下是 keys
调用的输出:
[u'hist_root', u'hist_root_contact_details', u'hist_root_links', u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']
Relationalize
将该历史记录表拆分为 6 个新表:1 个包含 DynamicFrame
中每个对象的记录的根表以及 5 个用于数组的辅助表。关系数据库中的数组处理通常不够理想,尤其是在这些数组变大时。将这些数组分成不同的表会使查询进展得快得多。
接下来,通过检查 contact_details
来查看分隔:
l_history.select_fields('contact_details').printSchema() dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()
以下是 show
调用的输出:
root
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 10| 0| fax| |
| 10| 1| | 202-225-1314|
| 10| 2| phone| |
| 10| 3| | 202-225-3772|
| 10| 4| twitter| |
| 10| 5| | MikeRossUpdates|
| 75| 0| fax| |
| 75| 1| | 202-225-7856|
| 75| 2| phone| |
| 75| 3| | 202-225-2711|
| 75| 4| twitter| |
| 75| 5| | SenCapito|
+---+-----+------------------------+-------------------------+
contact_details
字段是原始 DynamicFrame
中的一个结构数组。这些数组中的每个元素都是辅助表中的单独的行,通过 index
编制索引。此处的 id
是具有键 contact_details
的 hist_root
表的外键:
dfc.select('hist_root').toDF().where( "contact_details = 10 or contact_details = 75").select( ['id', 'given_name', 'family_name', 'contact_details']).show()
下面是输出:
+--------------------+----------+-----------+---------------+
| id|given_name|family_name|contact_details|
+--------------------+----------+-----------+---------------+
|f4fc30ee-7b42-432...| Mike| Ross| 10|
|e3c60f34-7d1b-4c0...| Shelley| Capito| 75|
+--------------------+----------+-----------+---------------+
请注意,这些命令中先后使用了 toDF()
和 where
表示式来筛选您要查看的行。
因此,将 hist_root
表与辅助表联接可让您执行以下操作:
将数据加载到不支持数组的数据库中。
使用 SQL 查询数组中的每个单独的项目。
使用 AWS Glue 连接安全地存储和访问您的 Amazon Redshift 凭证。有关如何创建您自己的连接的信息,请参阅 连接到数据。
现在,您已准备就绪,可以通过一次遍历一个 DynamicFrames
来将数据写入到连接:
for df_name in dfc.keys(): m_df = dfc.select(df_name) print "Writing to table: ", df_name glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df,
connection settings here
)
根据您的关系数据库类型,连接设置将有所不同:
-
有关写入到 Amazon Redshift 的说明,请参阅 Redshift 连接。
-
有关其他数据库,请参阅 AWS Glue for Spark 中适用于 ETL 的连接类型和选项。
结论
总的来说,AWS Glue 非常灵活。它让您只需几行代码即可完成通常需要编写几天才能实现的功能。您可以在 GitHub 上 join_and_relationalize.py
示例