测试蓝图
在开发代码时,应执行本地测试以验证工作流布局是否正确。
本地测试不会生成 AWS Glue 任务、爬网程序或触发器。相反,您可以在本地运行布局脚本并使用 to_json()
和 validate()
方法打印对象并查找错误。这些方法在库中定义的所有三个类中都可用。
可通过两种方式处理 AWS Glue 传递到您的布局函数的 user_params
和 system_params
参数。您的测试平台代码可以创建示例蓝图参数值的字典,并将其作为 user_params
参数传递到布局函数。或者,您可以移除对 user_params
的引用并使用硬编码字符串替代他们。
如果您的代码使用 system_params
参数中的 region
和 accountId
属性,您可以传入所拥有的 system_params
字典。
要测试蓝图
-
在包含库的目录中启动 Python 解释器,或将蓝图文件和提供的库加载到首选的集成开发环境(IDE)中。
-
确保您的代码导入提供的库。
-
将代码添加到布局函数以调用在任何实体上或在
Workflow
对象上的validate()
或to_json()
。例如,如果您的代码创建了一个名为mycrawler
的Crawler
对象,您可以调用validate()
,如下所示。mycrawler.validate()
您可以打印
mycrawler
,如下所示:print(mycrawler.to_json())
如果您对一个对象调用
to_json
,则不需要同时调用validate()
,因为to_json()
调用了validate()
。对工作流对象调用这些方法非常有用。假设您的脚本命名工作流对象
my_workflow
,验证并打印工作流对象,如下所示。print(my_workflow.to_json())
有关
to_json()
和validate()
的更多信息,请参阅 类方法。您还可以导入
pprint
并美观地打印列工作流对象,如本部分后面的示例所示。 -
运行代码,修复错误,最后删除对
validate()
或者to_json()
的调用。
以下示例说明如何构建示例蓝图参数字典并将其作为 user_params
参数传入布局函数 generate_compaction_workflow
。它还说明了如何美观地打印生成的工作流对象。
from pprint import pprint from awsglue.blueprint.workflow import * from awsglue.blueprint.job import * from awsglue.blueprint.crawler import * USER_PARAMS = {"WorkflowName": "compaction_workflow", "ScriptLocation": "s3://awsexamplebucket1/scripts/threaded-compaction.py", "PassRole": "arn:aws:iam::111122223333:role/GlueRole-ETL", "DatabaseName": "cloudtrial", "TableName": "ct_cloudtrail", "CoalesceFactor": 4, "MaxThreadWorkers": 200} def generate_compaction_workflow(user_params: dict, system_params: dict) -> Workflow: compaction_job = Job(Name=f"{user_params['WorkflowName']}_etl_job", Command={"Name": "glueetl", "ScriptLocation": user_params['ScriptLocation'], "PythonVersion": "3"}, Role="arn:aws:iam::111122223333:role/AWSGlueServiceRoleDefault", DefaultArguments={"DatabaseName": user_params['DatabaseName'], "TableName": user_params['TableName'], "CoalesceFactor": user_params['CoalesceFactor'], "max_thread_workers": user_params['MaxThreadWorkers']}) catalog_target = {"CatalogTargets": [{"DatabaseName": user_params['DatabaseName'], "Tables": [user_params['TableName']]}]} compacted_files_crawler = Crawler(Name=f"{user_params['WorkflowName']}_post_crawl", Targets = catalog_target, Role=user_params['PassRole'], DependsOn={compaction_job: "SUCCEEDED"}, WaitForDependencies="AND", SchemaChangePolicy={"DeleteBehavior": "LOG"}) compaction_workflow = Workflow(Name=user_params['WorkflowName'], Entities=Entities(Jobs=[compaction_job], Crawlers=[compacted_files_crawler])) return compaction_workflow generated = generate_compaction_workflow(user_params=USER_PARAMS, system_params={}) gen_dict = generated.to_json() pprint(gen_dict)