本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
教學課程:撰寫 Spark 指令碼的 AWS Glue
本教學將向您介紹撰寫 AWS Glue 指令碼的程序。您可以按照任務的排程執行指令碼,也可以與互動式工作階段互動執行。如需任務的詳細資訊,請參閱建立視覺ETL工作 AWS Glue Studio。如需互動式工作階段的詳細資訊,請參閱概觀 AWS Glue 互動式工作階段。
AWS Glue Studio 視覺化編輯器提供無程式碼的圖形介面,可用於建置 AWS Glue 工作。 AWS Glue 腳本回視覺作業。您可透過這些指令碼存取一組經過擴增的工具,這些工具能與 Apache Spark 程式搭配運作。您可以存取原生 Spark API,以及 AWS Glue 程式庫,以便從 G AWS lue 指令碼中擷取、轉換和載入 (ETL) 工作流程。
本教學課程中,您會擷取、轉換及載入違停罰單資料集。執行這項工作的指令碼在形式和功能上與 AWS 大數據部落格上使用 AWS Glue Studio 讓 ETL 更容易
在本教學課程中,您將使用 Python 語言和程式庫。類似的功能在 Scala 中可用。在完成本教程之後,您應該能夠生成並檢查 Scala 示例腳本,以了解如何執行 Scala AWS Glue ETL 腳本編寫過程。
必要條件
本教學課程具備下列先決條件:
-
與 AWS Glue Studio 部落格文章指示您執行 AWS CloudFormation 範本的必要條件相同。
此範本使用 AWS Glue 資料目錄來管理中可用的停車票資料集
s3://aws-bigdata-blog/artifacts/gluestudio/
。它建立將被引用以下資源: -
AWS Glue StudioRole:要執行 AWS Glue 任務的 IAM 角色
-
AWS Glue StudioAmazon S3Bucket:儲存部落格相關檔案的 Amazon S3 儲存貯體名稱
-
AWS Glue StudioTicketsYYZDB:AWS Glue Data Catalog 資料庫
-
AWS Glue StudioTableTickets— 用作來源的資料目錄表
-
AWS Glue StudioTableTrials— 用作來源的資料目錄表
-
AWS Glue StudioParkingTicketCount — 要用作目標的資料目錄表
-
在 AWS Glue 工作室部落格文章中產生的指令碼。如果部落格文章變更,指令碼在下列文字中同樣可用。
產生範例指令碼
您可以使用 AWS Glue Studio 視覺化編輯器做為強大的程式碼產生工具,為您要撰寫的指令碼建立支架。您將使用此工具建立範例指令碼。
如果您想跳過這些步驟,系統會提供指令碼。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # Script generated for node S3 bucket S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog( database="yyz-tickets", table_name="tickets", transformation_ctx="S3bucket_node1" ) # Script generated for node ApplyMapping ApplyMapping_node2 = ApplyMapping.apply( frame=S3bucket_node1, mappings=[ ("tag_number_masked", "string", "tag_number_masked", "string"), ("date_of_infraction", "string", "date_of_infraction", "string"), ("ticket_date", "string", "ticket_date", "string"), ("ticket_number", "decimal", "ticket_number", "float"), ("officer", "decimal", "officer_name", "decimal"), ("infraction_code", "decimal", "infraction_code", "decimal"), ("infraction_description", "string", "infraction_description", "string"), ("set_fine_amount", "decimal", "set_fine_amount", "float"), ("time_of_infraction", "decimal", "time_of_infraction", "decimal"), ], transformation_ctx="ApplyMapping_node2", ) # Script generated for node S3 bucket S3bucket_node3 = glueContext.write_dynamic_frame.from_options( frame=ApplyMapping_node2, connection_type="s3", format="glueparquet", connection_options={"path": "s3://
DOC-EXAMPLE-BUCKET
", "partitionKeys": []}, format_options={"compression": "gzip"}, transformation_ctx="S3bucket_node3", ) job.commit()
產生範例指令碼
-
完成 AWS Glue 工作室教學課程。若要完成此教學課程,請參閱從範例工作在 AWS Glue Studio 中建立工作。
-
導覽至任務頁面上的 Script (指令碼) 索引標籤,如以下螢幕擷取畫面所示:
-
複製 Script (指令碼) 索引標籤中的完整內容。透過在 Job details (任務詳細資訊) 中設定指令碼語言,您可以在產生 Python 程式碼或 Scala 程式碼之間來回切換。
步驟 1. 建立任務並貼上指令碼
在此步驟中,您要在中建立 AWS Glue 工作 AWS Management Console。這會設定允許 AWS Glue 執行指令碼的組態。此操作還會為您建立一個存放和編輯指令碼的地方。
建立任務
-
在中 AWS Management Console,導覽至 AWS Glue 登陸頁面。
-
在側邊的導覽窗格中,選擇 Jobs (任務)。
-
在 Create job (建立任務) 中選擇 Spark script editor (Spark 指令碼編輯器),接著選擇 Create (建立)。
-
選用 – 將指令碼的完整文字貼入 Script (指令碼) 窗格中。或者,您可以按照教學課程進行操作。
步驟 2. 匯入 AWS Glue 程式庫
您需要設定指令碼,使其與在指令碼外部定義的程式碼和組態互動。這項工作是在 AWS Glue 工作室幕後完成的。
在此步驟中,您會執行下列動作。
-
匯入並初始化
GlueContext
物件。從指令碼編寫的角度來看,這是最重要的匯入。這會公開定義來源和目標資料集所使用的標準方法,也就是任何 ETL 指令碼的起點。若要進一步了解GlueContext
類別,請參閱 GlueContext 類。 -
初始化
SparkContext
和SparkSession
。這些可讓您配置 AWS Glue 工作內可用的 Spark 引擎。您不需要直接在介紹性 AWS Glue 指令碼中使用它們。 -
呼叫
getResolvedOptions
,準備您的任務參數以在指令碼內使用。如需有關解析任務參數的詳細資訊,請參閱 使用 getResolvedOptions 存取參數。 -
初始化
Job
。該Job
對象設置配置並跟踪各種可選 AWS Glue 功能的狀態。您的指令碼可以在沒有Job
物件的情況下執行,但最佳實務是將其初始化,以免之後整合這些功能時造成混淆。其中一項功能是任務書籤,您可以在此教學課程中選擇性地設定該功能。您可以在以下章節中了解有關任務書籤的資訊:選用 – 啟用任務書籤。
在此程序中,您需編寫下列程式碼。此程式碼是產生的範例指令碼的一部分。
from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args)
若要匯入 AWS Glue 程式庫
-
複製此段程式碼並貼入 Script (指令碼) 編輯器中。
注意
您或許會認為,複製程式碼是一種不好的工程實務。在本教學課程中,我們建議您在所有 AWS Glue ETL 指令碼中持續命名核心變數。
步驟 3。從來源擷取資料
在任何 ETL 程序中,您都需要先定義要變更的來源資料集。在 AWS Glue Studio 視覺化編輯器中,您可以透過建立來源節點來提供此資訊。
在此步驟中,您需為 create_dynamic_frame.from_catalog
方法提供 database
和 table_name
,以便從 AWS Glue 資料目錄所設定的來源擷取資料。
在先前的步驟中,您已初始化 GlueContext
物件。您可以使用此物件來尋找設定來源所需使用的方法,例如 create_dynamic_frame.from_catalog
。
在此程序中,您需使用 create_dynamic_frame.from_catalog
編寫下列程式碼。此程式碼是產生的範例指令碼的一部分。
S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog( database="yyz-tickets", table_name="tickets", transformation_ctx="S3bucket_node1" )
從來源擷取資料
-
檢查文件以尋找從 AWS Glue 資料型錄中定義的來源擷取資料的方法。
GlueContext
這些方法記錄於 GlueContext 類。選擇 create_dynamic_frame.from_catalog 方法。在glueContext
上呼叫此方法。 -
檢查
create_dynamic_frame.from_catalog
的文件。此方法需要database
和table_name
參數。為create_dynamic_frame.from_catalog
提供必要的參數。AWS Glue 資料型錄會儲存來源資料位置和格式的相關資訊,並在必要條件區段中進行設定。您不必直接為指令碼提供該資訊。
-
選用 – 為該方法提供
transformation_ctx
參數,以支援任務書籤。您可以在以下章節中了解有關任務書籤的資訊:選用 – 啟用任務書籤。
注意
擷取資料的常用方法
create_dynamic_frame_from_catalog用於連線至 AWS Glue 資料型錄中的表格。
如果您需要直接為任務提供描述來源結構和位置的組態,請參閱 create_dynamic_frame_from_options 方法。您需要提供比使用 create_dynamic_frame.from_catalog
時更詳細的參數以描述您的資料。
請參閱有關 format_options
和 connection_parameters
的補充文件來識別所需參數。有關如何提供關於來源資料格式的指令碼資訊的說明,請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項。有關如何提供關於來源資料位置的指令碼資訊的說明,請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項。
如果您要從串流來源讀取資訊,您可以透過 create_data_frame_from_catalog 或 create_data_frame_from_options 方法為任務提供來源資訊。請注意,這些方法會傳回 Apache Spark DataFrames
。
儘管參考文件參考的是 create_dynamic_frame_from_catalog
,但產生的程式碼會呼叫 create_dynamic_frame.from_catalog
。這些方法最終會呼叫相同的程式碼,並包含在內,以便您可以編寫更簡潔的程式碼。您可以檢視 Python 包裝函式的來源,藉以確認這一點,該包裝函式可在 aws-glue-libs
步驟 4. 使用 AWS Glue 來轉換資料
在 ETL 程序中擷取來源資料後,您需要說明要如何變更資料。您可以在 AWS Glue Studio 視覺化編輯器中建立轉換節點來提供此資訊。
在此步驟中,您需為 ApplyMapping
方法提供目前所需欄位名稱和類型的映射,以轉換您的 DynamicFrame
。
執行以下轉換。
-
捨棄這四個
location
和province
索引鍵。 -
將
officer
的名稱變更為officer_name
。 -
將
ticket_number
和set_fine_amount
的類型變更為float
。
create_dynamic_frame.from_catalog
將為您提供 DynamicFrame
物件。A DynamicFrame
代表 AWS Glue 中的資料集。 AWS Glue 轉換是改變的操作DynamicFrames
。
注意
什麼是 DynamicFrame
?
DynamicFrame
是一種抽象,允許您連接資料集,該資料集中包含資料中項目的名稱和類型的描述。在阿帕奇星火,類似的抽象存在稱為 DataFrame. 如需的說明 DataFrames,請參閱星火 SQL 指南
DynamicFrames
可讓您動態描述資料集結構描述。考慮一個帶有價格列的數據集,其中一些條目將價格存儲為字符串,而其他條目則將價格存儲為雙精度。 AWS Glue 會計算結構描述 on-the-fly — 它會為每一列建立自我描述記錄。
不一致的欄位 (如價格) 在框架結構描述中用類型 (ChoiceType
) 明確表示。您可以使用 DropFields
捨棄不一致的欄位,或使用 ResolveChoice
來解析欄位,藉以處理欄位不一致的問題。這些轉換可從 DynamicFrame
取得。然後,您可以使用 writeDynamicFrame
將資料寫回資料湖。
您可以從 DynamicFrame
類別上的方法呼叫許多相同的轉換,這樣可以得到更具可讀性的指令碼。如需 DynamicFrame
的相關資訊,請參閱 DynamicFrame 類別。
在此程序中,您需使用 ApplyMapping
編寫下列程式碼。此程式碼是產生的範例指令碼的一部分。
ApplyMapping_node2 = ApplyMapping.apply( frame=S3bucket_node1, mappings=[ ("tag_number_masked", "string", "tag_number_masked", "string"), ("date_of_infraction", "string", "date_of_infraction", "string"), ("ticket_date", "string", "ticket_date", "string"), ("ticket_number", "decimal", "ticket_number", "float"), ("officer", "decimal", "officer_name", "decimal"), ("infraction_code", "decimal", "infraction_code", "decimal"), ("infraction_description", "string", "infraction_description", "string"), ("set_fine_amount", "decimal", "set_fine_amount", "float"), ("time_of_infraction", "decimal", "time_of_infraction", "decimal"), ], transformation_ctx="ApplyMapping_node2", )
若要使用 AWS Glue 轉換資料
-
檢查文件以識別要變更和捨棄欄位的轉換。如需詳細資訊,請參閱 GlueTransform base 類別。選擇
ApplyMapping
轉換。如需ApplyMapping
的相關資訊,請參閱 ApplyMapping 類別。在ApplyMapping
轉換物件上呼叫apply
。注意
什麼是
ApplyMapping
?ApplyMapping
採用DynamicFrame
並對其進行轉換。這需使用代表欄位轉換的元組清單,亦即「映射」。前兩個元組元素 (欄位名稱和類型) 用於識別框架中的欄位。第二對參數同樣是欄位名稱和類型。ApplyMapping 將源字段轉換為目標名稱,然後鍵入一個新的
DynamicFrame
,它返回。未提供的欄位會在傳回值中遭到捨棄。您可以呼叫與
DynamicFrame
上的apply_mapping
方法相同的轉換 (而不必呼叫apply
) 來建立更流暢、更易讀的程式碼。如需詳細資訊,請參閱 apply_mapping。 -
檢查
ApplyMapping
的文件以識別所需參數。請參閱ApplyMapping 類別。您會發現此方法需要frame
和mappings
參數。為ApplyMapping
提供必要的參數。 -
選用 – 為該方法提供
transformation_ctx
,以支援任務書籤。您可以在以下章節中了解有關任務書籤的資訊:選用 – 啟用任務書籤。
注意
步驟 5. 將資料載入目標
轉換資料之後,通常需將轉換後的資料存放在與來源不同的地方。您可以在 AWS Glue Studio 視覺化編輯器中建立目標節點來執行此作業。
在此步驟中,您需為 write_dynamic_frame.from_options
方法提供 connection_type
、connection_options
、format
和 format_options
,將資料載入 Amazon S3 中的目標儲存貯體。
在步驟 1 中,您初始化了 GlueContext
物件。在 AWS Glue 中,您可以在這裡找到用來設定目標的方法,就像來源一樣。
在此程序中,您需使用 write_dynamic_frame.from_options
編寫下列程式碼。此程式碼是產生的範例指令碼的一部分。
S3bucket_node3 = glueContext.write_dynamic_frame.from_options( frame=ApplyMapping_node2, connection_type="s3", format="glueparquet", connection_options={"path": "s3://
DOC-EXAMPLE-BUCKET
", "partitionKeys": []}, format_options={"compression": "gzip"}, transformation_ctx="S3bucket_node3", )
將資料載入目標
-
檢查文件,查找將資料載入目標 Amazon S3 儲存貯體的方法。這些方法記錄於 GlueContext 類。選擇 write_dynamic_frame_from_options 方法。在
glueContext
上呼叫此方法。注意
常用資料載入方法
write_dynamic_frame.from_options
是最常用的資料載入方法,它支持 AWS Glue 中可用的所有目標。如果您要寫入 AWS Glue 連線中定義的 JDBC 目標,請使用此方write_dynamic_frame_from_jdbc_conf法。 AWS Glue 連線會儲存如何連線至資料來源的相關資訊。如此一來,您就不需在
connection_options
提供該資訊,不過您仍然需要使用connection_options
來提供dbtable
。write_dynamic_frame.from_catalog
不是常用的資料載入方法。此方法會在不更新基礎資料集的情況下更新 AWS Glue 資料型錄,並與變更基礎資料集的其他程序一起使用。如需詳細資訊,請參閱 更新結構描述,並使用 在 Data Catalog 中新增分割區 AWS Glue ETL 任務。 -
檢查 write_dynamic_frame_from_options 的文件。此方法需要
frame
、connection_type
、format
、connection_options
和format_options
。在glueContext
上呼叫此方法。-
請參閱有關
format_options
和format
的補充文件以識別您需要的參數。如需資料格式的說明,請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項。 -
請參閱有關
connection_type
和connection_options
的補充文件以識別您需要的參數。如需連線的說明,請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項。 -
為
write_dynamic_frame.from_options
提供必要的參數。此方法的組態與create_dynamic_frame.from_options
類似。
-
-
選用 – 向
write_dynamic_frame.from_options
提供transformation_ctx
,以支援任務書籤。您可以在以下章節中了解有關任務書籤的資訊:選用 – 啟用任務書籤。
步驟 6. 遞交 Job
物件
您在步驟 1 中初始化了 Job
物件。您需要在指令碼結尾手動斷定其生命週期,某些選用功能需要此操作才能正常執行。這項工作是在 AWS Glue 工作室幕後完成的。
在此步驟中,在 Job
物件上呼叫 commit
方法。
在此程序中,您需編寫下列程式碼。此程式碼是產生的範例指令碼的一部分。
job.commit()
遞交 Job
物件
-
如果您尚未執行此操作,請執行先前章節中概述的選用步驟,將
transformation_ctx
納入其中。 -
呼叫
commit
。
選用 – 啟用任務書籤
之前的每個步驟都已經指示您設定 transformation_ctx
參數。這與名為任務書籤的功能有關。
藉由任務書籤,您可針對資料集定期執行任務,達到節省時間和金錢的目的,同時還能輕鬆追蹤先前的工作。Job 書籤會追蹤先前執行資料集中 AWS Glue 轉換的進度。通過跟踪先前運行結束的位置, AWS Glue 可以將其工作限制為以前未處理的行。如需任務書籤的詳細資訊,請參閱 使用任務書籤追蹤處理的資料。
若要啟用任務書籤,請先新增 transformation_ctx
陳述式至我們所提供的函數,如之前的範例所述。任務書籤狀態會在執行期間維持不變,而 transformation_ctx
參數是存取該狀態所需的索引鍵。這些陳述式自身不會執行任何操作。您還需要在任務的組態中啟用該功能。
在此程序中,您會使用 AWS Management Console啟用任務書籤。
啟用任務書籤
-
導覽至相應任務的 Job details (任務詳細資訊) 一節。
-
將 Job bookmark (任務書籤) 設定為 Enable (啟用)。
步驟 7. 將程式碼作為任務執行
在此步驟中,您需執行任務以確認您是否已成功完成本教學課程。這是通過點擊一個按鈕來完成的,就像在 AWS Glue Studio 可視化編輯器中一樣。
將程式碼作為任務執行
-
在標題列上選擇 Untitled job (未命名任務) 以編輯和設定您的任務名稱。
-
導覽至 Job details (任務詳細資訊) 索引標籤。為您的任務指派 IAM Role (IAM 角色)。您可以在 AWS Glue Studio 教學課程的先決條件中使用 AWS CloudFormation 範本所建立的範本所建立的項目。如果您已完成該教學課程,則該角色應可用,名為
AWS Glue StudioRole
。 -
選擇 Save (儲存) 以儲存您的指令碼。
-
選擇 Run (執行) 以執行您的任務。
-
導覽至 Runs (執行) 索引標籤,以確認您的任務已完成。
-
導覽至
DOC-EXAMPLE-BUCKET
,即write_dynamic_frame.from_options
的目標。確認輸出符合您的期望。
如需有關設定和管理任務的詳細資訊,請參閱 提供您的自訂指令碼。
其他資訊
阿帕奇星火庫和方法在 AWS Glue 腳本中可用。您可以查看 Spark 文件以了解您可以使用這些包含的程式庫來做什麼。如需詳細資訊,請參閱 Spark 來源儲存庫的範例區段
AWS Glue 2.0 + 默認情況下包括幾個常見的 Python 庫。在 Scala 或 Python 環境中,還有一些機制可以將自己的依賴項加載到 AWS Glue 作業中。如需 Python 相依性的相關資訊,請參閱 以 AWS Glue 使用 Python 程式庫。
有關如何在 Python 中使用 AWS Glue 功能的更多示例,請參閱AWS Glue Python 程式碼範例。Scala 和 Python 任務具有相同的功能,所以我們的 Python 範例應能為您提供以 Scala 執行類似工作的些許想法。