Testar um esquema
Enquanto você desenvolve seu código, você deve executar testes locais para verificar se o layout do fluxo de trabalho está correto.
Testes locais não geram trabalhos, crawlers ou acionadores do AWS Glue. Em vez disso, execute o script de layout localmente e use os métodos to_json()
e validate()
para imprimir objetos e encontrar erros. Esses métodos estão disponíveis em todas as três classes definidas nas bibliotecas.
Há duas maneiras de lidar com os argumentos user_params
e system_params
que o AWS Glue transmite para sua função de layout. Seu código de banco de teste pode criar um dicionário de valores de parâmetro de blueprint de exemplo e transmiti-lo para a função de layout como o argumento user_params
. Ou você pode remover as referências a user_params
e substituí-las por strings em código fixo.
Se o seu código usar as propriedades region
e accountId
no argumento system_params
, você pode transmitir seu próprio dicionário para system_params
.
Para testar um blueprint
-
Inicie um interpretador Python em um diretório com as bibliotecas ou carregue os arquivos de blueprint e as bibliotecas fornecidas em seu ambiente de desenvolvimento integrado (IDE) preferido.
-
Certifique-se de que o código importe as bibliotecas fornecidas.
-
Adicione código à sua função de layout para chamar
validate()
outo_json()
em qualquer entidade ou no objetoWorkflow
. Por exemplo, se o seu código criar um objetoCrawler
chamadomycrawler
, é possível chamarvalidate()
como a seguir.mycrawler.validate()
Você pode imprimir
mycrawler
como a seguir:print(mycrawler.to_json())
Se você chamar
to_json
em um objeto, não há necessidade de também chamarvalidate()
, poisto_json()
chamavalidate()
.É mais útil chamar esses métodos no objeto de fluxo de trabalho. Supondo que seu script nomeie o objeto de fluxo de trabalho
my_workflow
, valide e imprima o objeto de fluxo de trabalho da seguinte forma:print(my_workflow.to_json())
Para obter mais informações sobre
to_json()
evalidate()
, consulte Classe Methods.Você também pode importar
pprint
e formatar o estilo do objeto do fluxo de trabalho, conforme mostrado no exemplo mais adiante nesta seção. -
Execute o código, corrija erros e, finalmente, remova quaisquer chamadas para
validate()
outo_json()
.
O exemplo a seguir mostra como construir um dicionário de parâmetros de blueprint de exemplo e transmiti-lo como o argumento user_params
para a função de layout generate_compaction_workflow
. Ele também mostra como formatar o estilo do objeto de fluxo de trabalho gerado.
from pprint import pprint from awsglue.blueprint.workflow import * from awsglue.blueprint.job import * from awsglue.blueprint.crawler import * USER_PARAMS = {"WorkflowName": "compaction_workflow", "ScriptLocation": "s3://awsexamplebucket1/scripts/threaded-compaction.py", "PassRole": "arn:aws:iam::111122223333:role/GlueRole-ETL", "DatabaseName": "cloudtrial", "TableName": "ct_cloudtrail", "CoalesceFactor": 4, "MaxThreadWorkers": 200} def generate_compaction_workflow(user_params: dict, system_params: dict) -> Workflow: compaction_job = Job(Name=f"{user_params['WorkflowName']}_etl_job", Command={"Name": "glueetl", "ScriptLocation": user_params['ScriptLocation'], "PythonVersion": "3"}, Role="arn:aws:iam::111122223333:role/AWSGlueServiceRoleDefault", DefaultArguments={"DatabaseName": user_params['DatabaseName'], "TableName": user_params['TableName'], "CoalesceFactor": user_params['CoalesceFactor'], "max_thread_workers": user_params['MaxThreadWorkers']}) catalog_target = {"CatalogTargets": [{"DatabaseName": user_params['DatabaseName'], "Tables": [user_params['TableName']]}]} compacted_files_crawler = Crawler(Name=f"{user_params['WorkflowName']}_post_crawl", Targets = catalog_target, Role=user_params['PassRole'], DependsOn={compaction_job: "SUCCEEDED"}, WaitForDependencies="AND", SchemaChangePolicy={"DeleteBehavior": "LOG"}) compaction_workflow = Workflow(Name=user_params['WorkflowName'], Entities=Entities(Jobs=[compaction_job], Crawlers=[compacted_files_crawler])) return compaction_workflow generated = generate_compaction_workflow(user_params=USER_PARAMS, system_params={}) gen_dict = generated.to_json() pprint(gen_dict)