Pruebas a un esquema - AWS Glue

Pruebas a un esquema

Mientras desarrolla el código, debe realizar pruebas locales para verificar que el diseño del flujo de trabajo es correcto.

Las pruebas locales no generan trabajos, rastreadores o desencadenadores de AWS Glue. En su lugar, ejecute el script de diseño a nivel local y utilice los métodos to_json() y validate() para imprimir objetos y buscar errores. Estos métodos están disponibles en las tres clases definidas en las bibliotecas.

Hay dos formas de manejar los argumentos user_params y system_params que AWS Glue transfiere a la función de diseño. Su código de banco de pruebas puede crear un diccionario de valores de parámetros del proyecto de ejemplo y transferirlo a la función de diseño como el argumento user_params. O bien, puede eliminar las referencias a user_params y reemplazarlas con cadenas codificadas.

Si su código hace uso de las propiedades region y accountId en el argumento system_params, puede transferir su propio diccionario para system_params.

Para realizar pruebas a un proyecto
  1. Inicie un intérprete de Python en un directorio con las bibliotecas, o cargue los archivos del proyecto y las bibliotecas suministradas en su entorno de desarrollo integrado (IDE) preferido.

  2. Asegúrese de que el código importe las bibliotecas suministradas.

  3. Agregue código a su función de diseño para realizar llamadas a validate() o to_json() en cualquier entidad o en el objeto de Workflow. Por ejemplo, si su código crea un objeto de Crawler denominado mycrawler, puede llamar a validate() de la siguiente manera.

    mycrawler.validate()

    Puede imprimir mycrawler de la siguiente manera:

    print(mycrawler.to_json())

    Si llama a to_json en un objeto, no hay necesidad de llamar también a validate(), ya que to_json() llama a validate().

    Es muy útil llamar a estos métodos en el objeto de flujo de trabajo. Suponiendo que el script nombra el objeto de flujo de trabajo my_workflow, valide e imprima el objeto de flujo de trabajo de la siguiente manera.

    print(my_workflow.to_json())

    Para obtener más información sobre to_json() y validate(), consulte Métodos de clase.

    También puede importar pprint e imprimir el objeto de flujo de trabajo, tal y como se muestra en el ejemplo que se encuentra a continuación en esta sección.

  4. Ejecute el código, corrija los errores y, por último, elimine cualquier llamada a validate() o to_json().

El siguiente ejemplo muestra cómo construir un diccionario de parámetros del proyecto de ejemplo y transferirlo como el argumento user_params a la función de diseño generate_compaction_workflow. También muestra cómo imprimir el objeto de flujo de trabajo generado.

from pprint import pprint from awsglue.blueprint.workflow import * from awsglue.blueprint.job import * from awsglue.blueprint.crawler import * USER_PARAMS = {"WorkflowName": "compaction_workflow", "ScriptLocation": "s3://awsexamplebucket1/scripts/threaded-compaction.py", "PassRole": "arn:aws:iam::111122223333:role/GlueRole-ETL", "DatabaseName": "cloudtrial", "TableName": "ct_cloudtrail", "CoalesceFactor": 4, "MaxThreadWorkers": 200} def generate_compaction_workflow(user_params: dict, system_params: dict) -> Workflow: compaction_job = Job(Name=f"{user_params['WorkflowName']}_etl_job", Command={"Name": "glueetl", "ScriptLocation": user_params['ScriptLocation'], "PythonVersion": "3"}, Role="arn:aws:iam::111122223333:role/AWSGlueServiceRoleDefault", DefaultArguments={"DatabaseName": user_params['DatabaseName'], "TableName": user_params['TableName'], "CoalesceFactor": user_params['CoalesceFactor'], "max_thread_workers": user_params['MaxThreadWorkers']}) catalog_target = {"CatalogTargets": [{"DatabaseName": user_params['DatabaseName'], "Tables": [user_params['TableName']]}]} compacted_files_crawler = Crawler(Name=f"{user_params['WorkflowName']}_post_crawl", Targets = catalog_target, Role=user_params['PassRole'], DependsOn={compaction_job: "SUCCEEDED"}, WaitForDependencies="AND", SchemaChangePolicy={"DeleteBehavior": "LOG"}) compaction_workflow = Workflow(Name=user_params['WorkflowName'], Entities=Entities(Jobs=[compaction_job], Crawlers=[compacted_files_crawler])) return compaction_workflow generated = generate_compaction_workflow(user_params=USER_PARAMS, system_params={}) gen_dict = generated.to_json() pprint(gen_dict)