測試藍圖 - AWS Glue

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

測試藍圖

當您開發程式碼時,您應該執行本機測試,以確認工作流程配置是否正確。

本機測試不會產生 AWS Glue 任務、爬蟲程式或觸發程序。相反地,您可以在本機執行配置指令碼,並使用 to_json()validate() 方法來列印物件並尋找錯誤。這些方法可用於程式庫中定義的全部三個類別。

有兩種方式可以處理 AWS Glue 傳遞給您配置函數的 user_paramssystem_params 引數。您的測試工作台程式碼可以建立範例藍圖參數值的字典,並將其作為 user_params 引數傳遞給配置函數。或者,您也可以移除對 user_params 的參考並用硬式編碼字串替換它們。

如果您的程式碼在 system_params 引數中使用 regionaccountId 屬性,您可以傳入自己的 system_params 字典。

測試藍圖
  1. 在具有程式庫的目錄中啟動 Python 解釋器,或將藍圖檔案和提供的程式庫載入到您偏好的整合開發環境 (IDE)。

  2. 確保您的程式碼匯入提供的程式庫。

  3. 將程式碼新增到配置函數以在任何實體或 Workflow 物件上呼叫 validate()to_json()。例如,如果您的程式碼建立名為 mycrawlerCrawler 物件,您可以如下所示呼叫 validate()

    mycrawler.validate()

    您可以如下所示列印 mycrawler

    print(mycrawler.to_json())

    如果您在物件上呼叫 to_json,則不需要同時呼叫 validate(),因為 to_json() 會呼叫 validate()

    在工作流程物件上呼叫這些方法非常有用。假設您的指令碼將工作流程物件命名為 my_workflow,請如下所示驗證和列印工作流程物件。

    print(my_workflow.to_json())

    如需 to_json()validate() 的詳細資訊,請參閱「Class 方法」。

    您也可以匯入 pprint,然後美化顯示工作流程物件,如本節稍後的範例所示。

  4. 執行程式碼、修正錯誤,最後移除對 validate()to_json() 的呼叫。

下列範例顯示如何建構範例藍圖參數的字典,並將其作為 user_params 引數傳遞給配置函數 generate_compaction_workflow。它也會顯示如何美化顯示產生的工作流程物件。

from pprint import pprint from awsglue.blueprint.workflow import * from awsglue.blueprint.job import * from awsglue.blueprint.crawler import * USER_PARAMS = {"WorkflowName": "compaction_workflow", "ScriptLocation": "s3://awsexamplebucket1/scripts/threaded-compaction.py", "PassRole": "arn:aws:iam::111122223333:role/GlueRole-ETL", "DatabaseName": "cloudtrial", "TableName": "ct_cloudtrail", "CoalesceFactor": 4, "MaxThreadWorkers": 200} def generate_compaction_workflow(user_params: dict, system_params: dict) -> Workflow: compaction_job = Job(Name=f"{user_params['WorkflowName']}_etl_job", Command={"Name": "glueetl", "ScriptLocation": user_params['ScriptLocation'], "PythonVersion": "3"}, Role="arn:aws:iam::111122223333:role/AWSGlueServiceRoleDefault", DefaultArguments={"DatabaseName": user_params['DatabaseName'], "TableName": user_params['TableName'], "CoalesceFactor": user_params['CoalesceFactor'], "max_thread_workers": user_params['MaxThreadWorkers']}) catalog_target = {"CatalogTargets": [{"DatabaseName": user_params['DatabaseName'], "Tables": [user_params['TableName']]}]} compacted_files_crawler = Crawler(Name=f"{user_params['WorkflowName']}_post_crawl", Targets = catalog_target, Role=user_params['PassRole'], DependsOn={compaction_job: "SUCCEEDED"}, WaitForDependencies="AND", SchemaChangePolicy={"DeleteBehavior": "LOG"}) compaction_workflow = Workflow(Name=user_params['WorkflowName'], Entities=Entities(Jobs=[compaction_job], Crawlers=[compacted_files_crawler])) return compaction_workflow generated = generate_compaction_workflow(user_params=USER_PARAMS, system_params={}) gen_dict = generated.to_json() pprint(gen_dict)