本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
程式碼範例:加入和關聯化資料
本範例使用從 http://everypolitician.org/sample-dataset
儲存貯體的資料集:s3://awsglue-datasets/examples/us-legislators/all
。資料集包含美國國會議員和他們在美國眾議院和參議院內座位的 JSON 格式的資料,已針對教學用途稍作修改,並透過公有 Amazon S3 儲存貯體提供。
您可以在網站範例儲存庫join_and_relationalize.py
檔案中找到此範例的AWS Glue原始程式 GitHub 碼。
本指南將利用這項資料告訴您如何執行下列動作:
使用AWS Glue爬蟲程式將存放在公用 Amazon S3 儲存貯體中的物件分類,並將其結構描述儲存到 AWS Glue 資料型錄中。
檢查爬蟲程式所產生的資料表中繼資料和結構描述。
-
編寫 Python 擷取、傳輸和載入 (ETL) 指令碼,使用 Data Catalog 中的中繼資料執行下列動作:
將來自不同原始檔案的資料加入到單一資料表 (也就是將資料去正規化)。
篩選加入的資料表,依國會議員類型放入不同的資料表。
將產生的資料寫入到單獨的 Apache Parquet 檔案中,供以後分析之用。
在執行時偵錯 Python 或 PySpark 指令碼的建議方式 AWS 是在 AWS Glue Studio 上使用筆記型電腦。
步驟 1:在 Amazon S3 儲存貯體中網路爬取資料
-
請登入 AWS Management Console,然後開啟AWS Glue主控台,網址為 https://console.aws.amazon.com/glue/
。 -
遵循中的步驟設定爬蟲程式,建立新的爬行者程式,以將資料
s3://awsglue-datasets/examples/us-legislators/all
集編目到 AWS Glue 資料目錄legislators
中名稱的資料庫。範例資料已放在這個公有 Amazon S3 儲存貯體中。 -
執行新的爬蟲程式,接著查看
legislators
資料庫。爬蟲程式建立下列中繼資料資料表:
-
persons_json
-
memberships_json
-
organizations_json
-
events_json
-
areas_json
-
countries_r_json
這是一個包含國會議員和其歷史的半標準化資料表集合。
-
步驟 2:新增樣板指令碼至開發端點筆記本
將以下樣板指令碼貼至開發端點以匯入您需要的 AWS Glue 程式庫,並且設定單一的 GlueContext
:
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())
步驟 3:從資料目錄中的資料檢查結構描述
接下來,您可以輕鬆地 DynamicFrame 從 AWS Glue 資料型錄建立檢查,並檢查資料的結構描述。例如,若要查看 persons_json
資料表的結構描述,請將下列內容新增到筆記本:
persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json") print "Count: ", persons.count() persons.printSchema()
以下為列印呼叫的輸出:
Count: 1961
root
|-- family_name: string
|-- name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- note: string
| | |-- name: string
| | |-- lang: string
|-- sort_name: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- death_date: string
資料表中的每個人都是美國國會的成員。
若要檢視 memberships_json
資料表的結構描述,請輸入如下命令:
memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json") print "Count: ", memberships.count() memberships.printSchema()
其輸出如下:
Count: 10439
root
|-- area_id: string
|-- on_behalf_of_id: string
|-- organization_id: string
|-- role: string
|-- person_id: string
|-- legislative_period_id: string
|-- start_date: string
|-- end_date: string
organizations
為政黨和參議院與眾議院這兩個議會殿堂。若要檢視 organizations_json
資料表的結構描述,請輸入如下命令:
orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json") print "Count: ", orgs.count() orgs.printSchema()
其輸出如下:
Count: 13
root
|-- classification: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- image: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- lang: string
| | |-- note: string
| | |-- name: string
|-- id: string
|-- name: string
|-- seats: int
|-- type: string
步驟 4:篩選資料
接著,保留需要的欄位,將 id
重新命名為 org_id
。資料集很小,可以從整體來檢視。
toDF()
會將 DynamicFrame
轉換為 Apache Spark DataFrame
,因此您可套用 Apache Spark SQL 中現有的轉換:
orgs = orgs.drop_fields(['other_names', 'identifiers']).rename_field( 'id', 'org_id').rename_field( 'name', 'org_name') orgs.toDF().show()
以下將顯示輸出:
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|classification| org_id| org_name| links|seats| type| image|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
| party| party/al| AL| null| null| null| null|
| party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...|
| party|party/democrat-li...| Democrat-Liberal|[[website,http://...| null| null| null|
| legislature|d56acebe-8fdc-47b...|House of Represen...| null| 435|lower house| null|
| party| party/independent| Independent| null| null| null| null|
| party|party/new_progres...| New Progressive|[[website,http://...| null| null|https://upload.wi...|
| party|party/popular_dem...| Popular Democrat|[[website,http://...| null| null| null|
| party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...|
| party|party/republican-...|Republican-Conser...|[[website,http://...| null| null| null|
| party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...|
| party| party/independent| Independent| null| null| null| null|
| party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...|
| legislature|8fa6c3d2-71dc-478...| Senate| null| 100|upper house| null|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
輸入以下以檢視出現在 memberships
的 organizations
:
memberships.select_fields(['organization_id']).toDF().distinct().show()
以下將顯示輸出:
+--------------------+
| organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+
步驟 5:全部整合為一
現在,使用 AWS Glue 加入這些關聯式表格,並建立一份關於國會議員 memberships
及其對應的 organizations
的完整歷史記錄資料表。
-
首先,加入
persons
和memberships
的id
和person_id
。 -
接著,將結果加入到
orgs
的org_id
和organization_id
。 -
然後,捨棄冗餘欄位
person_id
和org_id
。
您可以在同一 (延伸) 指令碼行執行所有這些操作:
l_history = Join.apply(orgs, Join.apply(persons, memberships, 'id', 'person_id'), 'org_id', 'organization_id').drop_fields(['person_id', 'org_id']) print "Count: ", l_history.count() l_history.printSchema()
其輸出如下:
Count: 10439
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
| |-- element: struct
| | |-- note: string
| | |-- name: string
| | |-- lang: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- death_date: string
|-- legislative_period_id: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- image: string
|-- given_name: string
|-- family_name: string
|-- id: string
|-- start_date: string
|-- end_date: string
您現在取得最終的資料表,可用於分析。您可以用精巧、有效率的格式編寫,以用於分析 (也就是 Parquet),在 AWS Glue、Amazon Athena 或 Amazon Redshift Spectrum 上執行 SQL。
以下呼叫將資料表編寫到多個檔案,在稍後執行分析時支援快速平行讀取:
glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"}, format = "parquet")
若要將所有歷史記錄資料合併成單一檔案,您必須將其轉換為資料框架、分割並編寫:
s_history = l_history.toDF().repartition(1) s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')
或者,如果您希望將其分為參議院和眾議院:
l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part', partitionBy=['org_name'])
步驟 6:轉換關聯式資料庫的資料
AWS Glue 可讓您輕鬆地將資料寫入關聯式資料庫,例如 Amazon Redshift,即使是半結構化資料。其提供轉換 relationalize
,可將 DynamicFrames
扁平化,無論框架中的物件多複雜。
使用本範例中的 l_history
DynamicFrame
,以根資料表 (hist_root
) 的名稱和暫時任務路徑傳送至 relationalize
。將傳回 DynamicFrameCollection
。然後,您可以將 DynamicFrames
的名稱列在該集合中:
dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/") dfc.keys()
以下為 keys
呼叫的輸出:
[u'hist_root', u'hist_root_contact_details', u'hist_root_links', u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']
Relationalize
將歷史記錄資料表分成六個新資料表:根資料表包含在 DynamicFrame
中的各物件記錄,和陣列的輔助資料表。關聯式資料庫中的陣列處理通常為次最佳化,尤其在這些陣列變得龐大時。將陣列分成不同的資料表,可加快查詢速度。
接著,檢查 contact_details
以查看分隔:
l_history.select_fields('contact_details').printSchema() dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()
以下為 show
呼叫的輸出:
root
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 10| 0| fax| |
| 10| 1| | 202-225-1314|
| 10| 2| phone| |
| 10| 3| | 202-225-3772|
| 10| 4| twitter| |
| 10| 5| | MikeRossUpdates|
| 75| 0| fax| |
| 75| 1| | 202-225-7856|
| 75| 2| phone| |
| 75| 3| | 202-225-2711|
| 75| 4| twitter| |
| 75| 5| | SenCapito|
+---+-----+------------------------+-------------------------+
contact_details
欄位為原始 DynamicFrame
中結構的陣列。這些陣列的每個元素都是輔助資料表中的單獨資料列,以 index
編製索引。此處的 id
是 hist_root
資料表的外部金鑰,金鑰為 contact_details
:
dfc.select('hist_root').toDF().where( "contact_details = 10 or contact_details = 75").select( ['id', 'given_name', 'family_name', 'contact_details']).show()
以下為其輸出:
+--------------------+----------+-----------+---------------+
| id|given_name|family_name|contact_details|
+--------------------+----------+-----------+---------------+
|f4fc30ee-7b42-432...| Mike| Ross| 10|
|e3c60f34-7d1b-4c0...| Shelley| Capito| 75|
+--------------------+----------+-----------+---------------+
請注意,這些命令將使用 toDF()
,然後是 where
表達式,來篩選您想要查看的資料列。
因此,加入 hist_root
資料表與輔助資料表可執行下列動作:
無需陣列支援便能將資料載入到資料庫。
使用 SQL 查詢陣列中的每個個別項目。
使用 AWS Glue 連線以安全存放和存取您的 Amazon Redshift 憑證。有關如何建立自己的連線的詳細資訊,請參閱 連線至資料。
您現已準備好透過一次循環一個 DynamicFrames
來將資料寫入連接器:
for df_name in dfc.keys(): m_df = dfc.select(df_name) print "Writing to table: ", df_name glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df,
connection settings here
)
您的連接器設定將因您的關聯式資料庫類型而異:
-
如需有關寫入 Amazon Redshift 的指示,請參閱Redshift 連線。
-
若為其他資料庫,請參閱 ETL 中 的連線類型和選項 AWS Glue 適用於 Spark。
結論
整體而言,AWS Glue 極具彈性。它讓您用幾行程式碼便能完成通常要好幾天才能完成撰寫的任務。您可以在上 GitHub的AWS Glue範例join_and_relationalize.py
中的整個 source-to-target ETL 指令碼。