本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
當您開發程式碼時,您應該執行本機測試,以確認工作流程配置是否正確。
本機測試不會產生 AWS Glue 任務、爬蟲程式或觸發程序。相反地,您可以在本機執行配置指令碼,並使用 to_json()
和 validate()
方法來列印物件並尋找錯誤。這些方法可用於程式庫中定義的全部三個類別。
有兩種方式可以處理 AWS Glue 傳遞給您配置函數的 user_params
和 system_params
引數。您的測試工作台程式碼可以建立範例藍圖參數值的字典,並將其作為 user_params
引數傳遞給配置函數。或者,您也可以移除對 user_params
的參考並用硬式編碼字串替換它們。
如果您的程式碼在 system_params
引數中使用 region
和 accountId
屬性,您可以傳入自己的 system_params
字典。
測試藍圖
-
在具有程式庫的目錄中啟動 Python 解釋器,或將藍圖檔案和提供的程式庫載入到您偏好的整合開發環境 (IDE)。
-
確保您的程式碼匯入提供的程式庫。
-
將程式碼新增到配置函數以在任何實體或
Workflow
物件上呼叫validate()
或to_json()
。例如,如果您的程式碼建立名為mycrawler
的Crawler
物件,您可以如下所示呼叫validate()
。mycrawler.validate()
您可以如下所示列印
mycrawler
:print(mycrawler.to_json())
如果您在物件上呼叫
to_json
,則不需要同時呼叫validate()
,因為to_json()
會呼叫validate()
。在工作流程物件上呼叫這些方法非常有用。假設您的指令碼將工作流程物件命名為
my_workflow
,請如下所示驗證和列印工作流程物件。print(my_workflow.to_json())
如需
to_json()
和validate()
的更多相關資訊,請參閱Class 方法。您也可以匯入
pprint
,然後美化顯示工作流程物件,如本節稍後的範例所示。 -
執行程式碼、修正錯誤,最後移除對
validate()
或to_json()
的呼叫。
下列範例顯示如何建構範例藍圖參數的字典,並將其作為 user_params
引數傳遞給配置函數 generate_compaction_workflow
。它也會顯示如何美化顯示產生的工作流程物件。
from pprint import pprint
from awsglue.blueprint.workflow import *
from awsglue.blueprint.job import *
from awsglue.blueprint.crawler import *
USER_PARAMS = {"WorkflowName": "compaction_workflow",
"ScriptLocation": "s3://awsexamplebucket1/scripts/threaded-compaction.py",
"PassRole": "arn:aws:iam::111122223333:role/GlueRole-ETL",
"DatabaseName": "cloudtrial",
"TableName": "ct_cloudtrail",
"CoalesceFactor": 4,
"MaxThreadWorkers": 200}
def generate_compaction_workflow(user_params: dict, system_params: dict) -> Workflow:
compaction_job = Job(Name=f"{user_params['WorkflowName']}_etl_job",
Command={"Name": "glueetl",
"ScriptLocation": user_params['ScriptLocation'],
"PythonVersion": "3"},
Role="arn:aws:iam::111122223333:role/AWSGlueServiceRoleDefault",
DefaultArguments={"DatabaseName": user_params['DatabaseName'],
"TableName": user_params['TableName'],
"CoalesceFactor": user_params['CoalesceFactor'],
"max_thread_workers": user_params['MaxThreadWorkers']})
catalog_target = {"CatalogTargets": [{"DatabaseName": user_params['DatabaseName'], "Tables": [user_params['TableName']]}]}
compacted_files_crawler = Crawler(Name=f"{user_params['WorkflowName']}_post_crawl",
Targets = catalog_target,
Role=user_params['PassRole'],
DependsOn={compaction_job: "SUCCEEDED"},
WaitForDependencies="AND",
SchemaChangePolicy={"DeleteBehavior": "LOG"})
compaction_workflow = Workflow(Name=user_params['WorkflowName'],
Entities=Entities(Jobs=[compaction_job],
Crawlers=[compacted_files_crawler]))
return compaction_workflow
generated = generate_compaction_workflow(user_params=USER_PARAMS, system_params={})
gen_dict = generated.to_json()
pprint(gen_dict)