Testar um esquema - AWS Glue

Testar um esquema

Enquanto você desenvolve seu código, você deve executar testes locais para verificar se o layout do fluxo de trabalho está correto.

Testes locais não geram trabalhos, crawlers ou acionadores do AWS Glue. Em vez disso, execute o script de layout localmente e use os métodos to_json() e validate() para imprimir objetos e encontrar erros. Esses métodos estão disponíveis em todas as três classes definidas nas bibliotecas.

Há duas maneiras de lidar com os argumentos user_params e system_params que o AWS Glue transmite para sua função de layout. Seu código de banco de teste pode criar um dicionário de valores de parâmetro de blueprint de exemplo e transmiti-lo para a função de layout como o argumento user_params. Ou você pode remover as referências a user_params e substituí-las por strings em código fixo.

Se o seu código usar as propriedades region e accountId no argumento system_params, você pode transmitir seu próprio dicionário para system_params.

Para testar um blueprint
  1. Inicie um interpretador Python em um diretório com as bibliotecas ou carregue os arquivos de blueprint e as bibliotecas fornecidas em seu ambiente de desenvolvimento integrado (IDE) preferido.

  2. Certifique-se de que o código importe as bibliotecas fornecidas.

  3. Adicione código à sua função de layout para chamar validate() ou to_json() em qualquer entidade ou no objeto Workflow. Por exemplo, se o seu código criar um objeto Crawler chamado mycrawler, é possível chamar validate() como a seguir.

    mycrawler.validate()

    Você pode imprimir mycrawler como a seguir:

    print(mycrawler.to_json())

    Se você chamar to_json em um objeto, não há necessidade de também chamar validate(), pois to_json() chama validate().

    É mais útil chamar esses métodos no objeto de fluxo de trabalho. Supondo que seu script nomeie o objeto de fluxo de trabalho my_workflow, valide e imprima o objeto de fluxo de trabalho da seguinte forma:

    print(my_workflow.to_json())

    Para obter mais informações sobre to_json() e validate(), consulte Classe Methods.

    Você também pode importar pprint e formatar o estilo do objeto do fluxo de trabalho, conforme mostrado no exemplo mais adiante nesta seção.

  4. Execute o código, corrija erros e, finalmente, remova quaisquer chamadas para validate() ou to_json().

O exemplo a seguir mostra como construir um dicionário de parâmetros de blueprint de exemplo e transmiti-lo como o argumento user_params para a função de layout generate_compaction_workflow. Ele também mostra como formatar o estilo do objeto de fluxo de trabalho gerado.

from pprint import pprint from awsglue.blueprint.workflow import * from awsglue.blueprint.job import * from awsglue.blueprint.crawler import * USER_PARAMS = {"WorkflowName": "compaction_workflow", "ScriptLocation": "s3://awsexamplebucket1/scripts/threaded-compaction.py", "PassRole": "arn:aws:iam::111122223333:role/GlueRole-ETL", "DatabaseName": "cloudtrial", "TableName": "ct_cloudtrail", "CoalesceFactor": 4, "MaxThreadWorkers": 200} def generate_compaction_workflow(user_params: dict, system_params: dict) -> Workflow: compaction_job = Job(Name=f"{user_params['WorkflowName']}_etl_job", Command={"Name": "glueetl", "ScriptLocation": user_params['ScriptLocation'], "PythonVersion": "3"}, Role="arn:aws:iam::111122223333:role/AWSGlueServiceRoleDefault", DefaultArguments={"DatabaseName": user_params['DatabaseName'], "TableName": user_params['TableName'], "CoalesceFactor": user_params['CoalesceFactor'], "max_thread_workers": user_params['MaxThreadWorkers']}) catalog_target = {"CatalogTargets": [{"DatabaseName": user_params['DatabaseName'], "Tables": [user_params['TableName']]}]} compacted_files_crawler = Crawler(Name=f"{user_params['WorkflowName']}_post_crawl", Targets = catalog_target, Role=user_params['PassRole'], DependsOn={compaction_job: "SUCCEEDED"}, WaitForDependencies="AND", SchemaChangePolicy={"DeleteBehavior": "LOG"}) compaction_workflow = Workflow(Name=user_params['WorkflowName'], Entities=Entities(Jobs=[compaction_job], Crawlers=[compacted_files_crawler])) return compaction_workflow generated = generate_compaction_workflow(user_params=USER_PARAMS, system_params={}) gen_dict = generated.to_json() pprint(gen_dict)