Enquanto você desenvolve seu código, você deve executar testes locais para verificar se o layout do fluxo de trabalho está correto.
Testes locais não geram trabalhos, crawlers ou acionadores do AWS Glue. Em vez disso, execute o script de layout localmente e use os métodos to_json()
e validate()
para imprimir objetos e encontrar erros. Esses métodos estão disponíveis em todas as três classes definidas nas bibliotecas.
Há duas maneiras de lidar com os argumentos user_params
e system_params
que o AWS Glue transmite para sua função de layout. Seu código de banco de teste pode criar um dicionário de valores de parâmetro de blueprint de exemplo e transmiti-lo para a função de layout como o argumento user_params
. Ou você pode remover as referências a user_params
e substituí-las por strings em código fixo.
Se o seu código usar as propriedades region
e accountId
no argumento system_params
, você pode transmitir seu próprio dicionário para system_params
.
Para testar um blueprint
-
Inicie um interpretador Python em um diretório com as bibliotecas ou carregue os arquivos de blueprint e as bibliotecas fornecidas em seu ambiente de desenvolvimento integrado (IDE) preferido.
-
Certifique-se de que o código importe as bibliotecas fornecidas.
-
Adicione código à sua função de layout para chamar
validate()
outo_json()
em qualquer entidade ou no objetoWorkflow
. Por exemplo, se o seu código criar um objetoCrawler
chamadomycrawler
, é possível chamarvalidate()
como a seguir.mycrawler.validate()
Você pode imprimir
mycrawler
como a seguir:print(mycrawler.to_json())
Se você chamar
to_json
em um objeto, não há necessidade de também chamarvalidate()
, poisto_json()
chamavalidate()
.É mais útil chamar esses métodos no objeto de fluxo de trabalho. Supondo que seu script nomeie o objeto de fluxo de trabalho
my_workflow
, valide e imprima o objeto de fluxo de trabalho da seguinte forma:print(my_workflow.to_json())
Para obter mais informações sobre
to_json()
evalidate()
, consulte Classe Methods.Você também pode importar
pprint
e formatar o estilo do objeto do fluxo de trabalho, conforme mostrado no exemplo mais adiante nesta seção. -
Execute o código, corrija erros e, finalmente, remova quaisquer chamadas para
validate()
outo_json()
.
O exemplo a seguir mostra como construir um dicionário de parâmetros de blueprint de exemplo e transmiti-lo como o argumento user_params
para a função de layout generate_compaction_workflow
. Ele também mostra como formatar o estilo do objeto de fluxo de trabalho gerado.
from pprint import pprint
from awsglue.blueprint.workflow import *
from awsglue.blueprint.job import *
from awsglue.blueprint.crawler import *
USER_PARAMS = {"WorkflowName": "compaction_workflow",
"ScriptLocation": "s3://awsexamplebucket1/scripts/threaded-compaction.py",
"PassRole": "arn:aws:iam::111122223333:role/GlueRole-ETL",
"DatabaseName": "cloudtrial",
"TableName": "ct_cloudtrail",
"CoalesceFactor": 4,
"MaxThreadWorkers": 200}
def generate_compaction_workflow(user_params: dict, system_params: dict) -> Workflow:
compaction_job = Job(Name=f"{user_params['WorkflowName']}_etl_job",
Command={"Name": "glueetl",
"ScriptLocation": user_params['ScriptLocation'],
"PythonVersion": "3"},
Role="arn:aws:iam::111122223333:role/AWSGlueServiceRoleDefault",
DefaultArguments={"DatabaseName": user_params['DatabaseName'],
"TableName": user_params['TableName'],
"CoalesceFactor": user_params['CoalesceFactor'],
"max_thread_workers": user_params['MaxThreadWorkers']})
catalog_target = {"CatalogTargets": [{"DatabaseName": user_params['DatabaseName'], "Tables": [user_params['TableName']]}]}
compacted_files_crawler = Crawler(Name=f"{user_params['WorkflowName']}_post_crawl",
Targets = catalog_target,
Role=user_params['PassRole'],
DependsOn={compaction_job: "SUCCEEDED"},
WaitForDependencies="AND",
SchemaChangePolicy={"DeleteBehavior": "LOG"})
compaction_workflow = Workflow(Name=user_params['WorkflowName'],
Entities=Entities(Jobs=[compaction_job],
Crawlers=[compacted_files_crawler]))
return compaction_workflow
generated = generate_compaction_workflow(user_params=USER_PARAMS, system_params={})
gen_dict = generated.to_json()
pprint(gen_dict)