Pruebas a un esquema
Mientras desarrolla el código, debe realizar pruebas locales para verificar que el diseño del flujo de trabajo es correcto.
Las pruebas locales no generan trabajos, rastreadores o desencadenadores de AWS Glue. En su lugar, ejecute el script de diseño a nivel local y utilice los métodos to_json()
y validate()
para imprimir objetos y buscar errores. Estos métodos están disponibles en las tres clases definidas en las bibliotecas.
Hay dos formas de manejar los argumentos user_params
y system_params
que AWS Glue transfiere a la función de diseño. Su código de banco de pruebas puede crear un diccionario de valores de parámetros del proyecto de ejemplo y transferirlo a la función de diseño como el argumento user_params
. O bien, puede eliminar las referencias a user_params
y reemplazarlas con cadenas codificadas.
Si su código hace uso de las propiedades region
y accountId
en el argumento system_params
, puede transferir su propio diccionario para system_params
.
Para realizar pruebas a un proyecto
-
Inicie un intérprete de Python en un directorio con las bibliotecas, o cargue los archivos del proyecto y las bibliotecas suministradas en su entorno de desarrollo integrado (IDE) preferido.
-
Asegúrese de que el código importe las bibliotecas suministradas.
-
Agregue código a su función de diseño para realizar llamadas a
validate()
oto_json()
en cualquier entidad o en el objeto deWorkflow
. Por ejemplo, si su código crea un objeto deCrawler
denominadomycrawler
, puede llamar avalidate()
de la siguiente manera.mycrawler.validate()
Puede imprimir
mycrawler
de la siguiente manera:print(mycrawler.to_json())
Si llama a
to_json
en un objeto, no hay necesidad de llamar también avalidate()
, ya queto_json()
llama avalidate()
.Es muy útil llamar a estos métodos en el objeto de flujo de trabajo. Suponiendo que el script nombra el objeto de flujo de trabajo
my_workflow
, valide e imprima el objeto de flujo de trabajo de la siguiente manera.print(my_workflow.to_json())
Para obtener más información sobre
to_json()
yvalidate()
, consulte Métodos de clase.También puede importar
pprint
e imprimir el objeto de flujo de trabajo, tal y como se muestra en el ejemplo que se encuentra a continuación en esta sección. -
Ejecute el código, corrija los errores y, por último, elimine cualquier llamada a
validate()
oto_json()
.
El siguiente ejemplo muestra cómo construir un diccionario de parámetros del proyecto de ejemplo y transferirlo como el argumento user_params
a la función de diseño generate_compaction_workflow
. También muestra cómo imprimir el objeto de flujo de trabajo generado.
from pprint import pprint from awsglue.blueprint.workflow import * from awsglue.blueprint.job import * from awsglue.blueprint.crawler import * USER_PARAMS = {"WorkflowName": "compaction_workflow", "ScriptLocation": "s3://awsexamplebucket1/scripts/threaded-compaction.py", "PassRole": "arn:aws:iam::111122223333:role/GlueRole-ETL", "DatabaseName": "cloudtrial", "TableName": "ct_cloudtrail", "CoalesceFactor": 4, "MaxThreadWorkers": 200} def generate_compaction_workflow(user_params: dict, system_params: dict) -> Workflow: compaction_job = Job(Name=f"{user_params['WorkflowName']}_etl_job", Command={"Name": "glueetl", "ScriptLocation": user_params['ScriptLocation'], "PythonVersion": "3"}, Role="arn:aws:iam::111122223333:role/AWSGlueServiceRoleDefault", DefaultArguments={"DatabaseName": user_params['DatabaseName'], "TableName": user_params['TableName'], "CoalesceFactor": user_params['CoalesceFactor'], "max_thread_workers": user_params['MaxThreadWorkers']}) catalog_target = {"CatalogTargets": [{"DatabaseName": user_params['DatabaseName'], "Tables": [user_params['TableName']]}]} compacted_files_crawler = Crawler(Name=f"{user_params['WorkflowName']}_post_crawl", Targets = catalog_target, Role=user_params['PassRole'], DependsOn={compaction_job: "SUCCEEDED"}, WaitForDependencies="AND", SchemaChangePolicy={"DeleteBehavior": "LOG"}) compaction_workflow = Workflow(Name=user_params['WorkflowName'], Entities=Entities(Jobs=[compaction_job], Crawlers=[compacted_files_crawler])) return compaction_workflow generated = generate_compaction_workflow(user_params=USER_PARAMS, system_params={}) gen_dict = generated.to_json() pprint(gen_dict)