本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
測試藍圖
當您開發程式碼時,您應該執行本機測試,以確認工作流程配置是否正確。
本機測試不會產生 AWS Glue 任務、爬蟲程式或觸發程序。相反地,您可以在本機執行配置指令碼,並使用 to_json()
和 validate()
方法來列印物件並尋找錯誤。這些方法可用於程式庫中定義的全部三個類別。
有兩種方式可以處理 AWS Glue 傳遞給您配置函數的 user_params
和 system_params
引數。您的測試工作台程式碼可以建立範例藍圖參數值的字典,並將其作為 user_params
引數傳遞給配置函數。或者,您也可以移除對 user_params
的參考並用硬式編碼字串替換它們。
如果您的程式碼在 system_params
引數中使用 region
和 accountId
屬性,您可以傳入自己的 system_params
字典。
測試藍圖
-
在具有程式庫的目錄中啟動 Python 解釋器,或將藍圖檔案和提供的程式庫載入到您偏好的整合開發環境 (IDE)。
-
確保您的程式碼匯入提供的程式庫。
-
將程式碼新增到配置函數以在任何實體或
Workflow
物件上呼叫validate()
或to_json()
。例如,如果您的程式碼建立名為mycrawler
的Crawler
物件,您可以如下所示呼叫validate()
。mycrawler.validate()
您可以如下所示列印
mycrawler
:print(mycrawler.to_json())
如果您在物件上呼叫
to_json
,則不需要同時呼叫validate()
,因為to_json()
會呼叫validate()
。在工作流程物件上呼叫這些方法非常有用。假設您的指令碼將工作流程物件命名為
my_workflow
,請如下所示驗證和列印工作流程物件。print(my_workflow.to_json())
如需
to_json()
和validate()
的詳細資訊,請參閱「Class 方法」。您也可以匯入
pprint
,然後美化顯示工作流程物件,如本節稍後的範例所示。 -
執行程式碼、修正錯誤,最後移除對
validate()
或to_json()
的呼叫。
下列範例顯示如何建構範例藍圖參數的字典,並將其作為 user_params
引數傳遞給配置函數 generate_compaction_workflow
。它也會顯示如何美化顯示產生的工作流程物件。
from pprint import pprint from awsglue.blueprint.workflow import * from awsglue.blueprint.job import * from awsglue.blueprint.crawler import * USER_PARAMS = {"WorkflowName": "compaction_workflow", "ScriptLocation": "s3://awsexamplebucket1/scripts/threaded-compaction.py", "PassRole": "arn:aws:iam::111122223333:role/GlueRole-ETL", "DatabaseName": "cloudtrial", "TableName": "ct_cloudtrail", "CoalesceFactor": 4, "MaxThreadWorkers": 200} def generate_compaction_workflow(user_params: dict, system_params: dict) -> Workflow: compaction_job = Job(Name=f"{user_params['WorkflowName']}_etl_job", Command={"Name": "glueetl", "ScriptLocation": user_params['ScriptLocation'], "PythonVersion": "3"}, Role="arn:aws:iam::111122223333:role/AWSGlueServiceRoleDefault", DefaultArguments={"DatabaseName": user_params['DatabaseName'], "TableName": user_params['TableName'], "CoalesceFactor": user_params['CoalesceFactor'], "max_thread_workers": user_params['MaxThreadWorkers']}) catalog_target = {"CatalogTargets": [{"DatabaseName": user_params['DatabaseName'], "Tables": [user_params['TableName']]}]} compacted_files_crawler = Crawler(Name=f"{user_params['WorkflowName']}_post_crawl", Targets = catalog_target, Role=user_params['PassRole'], DependsOn={compaction_job: "SUCCEEDED"}, WaitForDependencies="AND", SchemaChangePolicy={"DeleteBehavior": "LOG"}) compaction_workflow = Workflow(Name=user_params['WorkflowName'], Entities=Entities(Jobs=[compaction_job], Crawlers=[compacted_files_crawler])) return compaction_workflow generated = generate_compaction_workflow(user_params=USER_PARAMS, system_params={}) gen_dict = generated.to_json() pprint(gen_dict)