测试蓝图 - AWS Glue

测试蓝图

在开发代码时,应执行本地测试以验证工作流布局是否正确。

本地测试不会生成 AWS Glue 任务、爬网程序或触发器。相反,您可以在本地运行布局脚本并使用 to_json()validate() 方法打印对象并查找错误。这些方法在库中定义的所有三个类中都可用。

可通过两种方式处理 AWS Glue 传递到您的布局函数的 user_paramssystem_params 参数。您的测试平台代码可以创建示例蓝图参数值的字典,并将其作为 user_params 参数传递到布局函数。或者,您可以移除对 user_params 的引用并使用硬编码字符串替代他们。

如果您的代码使用 system_params 参数中的 regionaccountId 属性,您可以传入所拥有的 system_params 字典。

要测试蓝图
  1. 在包含库的目录中启动 Python 解释器,或将蓝图文件和提供的库加载到首选的集成开发环境(IDE)中。

  2. 确保您的代码导入提供的库。

  3. 将代码添加到布局函数以调用在任何实体上或在 Workflow 对象上的 validate()to_json()。例如,如果您的代码创建了一个名为 mycrawlerCrawler 对象,您可以调用 validate(),如下所示。

    mycrawler.validate()

    您可以打印 mycrawler,如下所示:

    print(mycrawler.to_json())

    如果您对一个对象调用 to_json,则不需要同时调用 validate(),因为 to_json() 调用了 validate()

    对工作流对象调用这些方法非常有用。假设您的脚本命名工作流对象 my_workflow,验证并打印工作流对象,如下所示。

    print(my_workflow.to_json())

    有关 to_json()validate() 的更多信息,请参阅 类方法

    您还可以导入 pprint 并美观地打印列工作流对象,如本部分后面的示例所示。

  4. 运行代码,修复错误,最后删除对 validate() 或者 to_json() 的调用。

以下示例说明如何构建示例蓝图参数字典并将其作为 user_params 参数传入布局函数 generate_compaction_workflow。它还说明了如何美观地打印生成的工作流对象。

from pprint import pprint from awsglue.blueprint.workflow import * from awsglue.blueprint.job import * from awsglue.blueprint.crawler import * USER_PARAMS = {"WorkflowName": "compaction_workflow", "ScriptLocation": "s3://awsexamplebucket1/scripts/threaded-compaction.py", "PassRole": "arn:aws:iam::111122223333:role/GlueRole-ETL", "DatabaseName": "cloudtrial", "TableName": "ct_cloudtrail", "CoalesceFactor": 4, "MaxThreadWorkers": 200} def generate_compaction_workflow(user_params: dict, system_params: dict) -> Workflow: compaction_job = Job(Name=f"{user_params['WorkflowName']}_etl_job", Command={"Name": "glueetl", "ScriptLocation": user_params['ScriptLocation'], "PythonVersion": "3"}, Role="arn:aws:iam::111122223333:role/AWSGlueServiceRoleDefault", DefaultArguments={"DatabaseName": user_params['DatabaseName'], "TableName": user_params['TableName'], "CoalesceFactor": user_params['CoalesceFactor'], "max_thread_workers": user_params['MaxThreadWorkers']}) catalog_target = {"CatalogTargets": [{"DatabaseName": user_params['DatabaseName'], "Tables": [user_params['TableName']]}]} compacted_files_crawler = Crawler(Name=f"{user_params['WorkflowName']}_post_crawl", Targets = catalog_target, Role=user_params['PassRole'], DependsOn={compaction_job: "SUCCEEDED"}, WaitForDependencies="AND", SchemaChangePolicy={"DeleteBehavior": "LOG"}) compaction_workflow = Workflow(Name=user_params['WorkflowName'], Entities=Entities(Jobs=[compaction_job], Crawlers=[compacted_files_crawler])) return compaction_workflow generated = generate_compaction_workflow(user_params=USER_PARAMS, system_params={}) gen_dict = generated.to_json() pprint(gen_dict)