Test d'un plan - AWS Glue

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Test d'un plan

Pendant que vous développez votre code, vous devez effectuer des tests locaux pour vérifier que la structure du flux de travail est correcte.

Les tests locaux ne génèrent pas de tâches, d'crawlers ou de déclencheurs AWS Glue. Au lieu de cela, vous exécutez le script de structure localement et utilisez la méthode to_json() et validate() pour imprimer des objets et trouver des erreurs. Ces méthodes sont disponibles dans les trois classes définies dans les bibliothèques.

Il existe deux façons de gérer les arguments user_params et system_params que AWS Glue passe à votre fonction de structure. Votre code de banc de test peut créer un dictionnaire d'exemples de valeurs de paramètre de modèle et le transmettre à la fonction de structure en tant qu'argument user_params. Vous pouvez également supprimer les références à user_params et remplacez-les par des chaînes codées en dur.

Si votre code utilise les propriétés region et accountId dans system_params, vous pouvez passer votre propre dictionnaire pour system_params.

Pour tester un modèle
  1. Démarrez un interpréteur Python dans un répertoire avec les bibliothèques, ou chargez les fichiers du modèle et les bibliothèques fournies dans votre environnement de développement intégré (IDE) préféré.

  2. Assurez-vous que votre code importe les bibliothèques fournies.

  3. Ajouter du code à votre fonction de structure pour appeler validate() ou to_json() sur n'importe quelle entité ou sur l'objet Workflow. Par exemple, si votre code crée un objet Crawler nommé mycrawler, vous pouvez appeler validate() comme suit.

    mycrawler.validate()

    Vous pouvez imprimer mycrawler comme suit :

    print(mycrawler.to_json())

    Si vous appelez to_json sur un objet, il n'est pas nécessaire d'appeler également validate(), parce que to_json() appelle validate().

    Il est plus utile d'appeler ces méthodes sur l'objet de flux de travail. En supposant que votre script nomme l'objet de flux de travail my_workflow, validez et imprimez l'objet de flux de travail comme suit.

    print(my_workflow.to_json())

    Pour plus d'informations sur to_json() et validate(), consultez Méthodes de classe.

    Vous pouvez également importer pprint et imprimer l'objet de flux de travail, tel que décrit dans l'exemple plus loin dans cette section.

  4. Exécutez le code, corrigez les erreurs et supprimez enfin tous les appels à validate() ou to_json().

L'exemple suivant montre comment construire un dictionnaire d'exemples de paramètres de modèle et le transmettre en tant qu'argument user_params à la fonction de structure generate_compaction_workflow. L'exemple montre également comment imprimer automatiquement l'objet de flux de travail généré.

from pprint import pprint from awsglue.blueprint.workflow import * from awsglue.blueprint.job import * from awsglue.blueprint.crawler import * USER_PARAMS = {"WorkflowName": "compaction_workflow", "ScriptLocation": "s3://awsexamplebucket1/scripts/threaded-compaction.py", "PassRole": "arn:aws:iam::111122223333:role/GlueRole-ETL", "DatabaseName": "cloudtrial", "TableName": "ct_cloudtrail", "CoalesceFactor": 4, "MaxThreadWorkers": 200} def generate_compaction_workflow(user_params: dict, system_params: dict) -> Workflow: compaction_job = Job(Name=f"{user_params['WorkflowName']}_etl_job", Command={"Name": "glueetl", "ScriptLocation": user_params['ScriptLocation'], "PythonVersion": "3"}, Role="arn:aws:iam::111122223333:role/AWSGlueServiceRoleDefault", DefaultArguments={"DatabaseName": user_params['DatabaseName'], "TableName": user_params['TableName'], "CoalesceFactor": user_params['CoalesceFactor'], "max_thread_workers": user_params['MaxThreadWorkers']}) catalog_target = {"CatalogTargets": [{"DatabaseName": user_params['DatabaseName'], "Tables": [user_params['TableName']]}]} compacted_files_crawler = Crawler(Name=f"{user_params['WorkflowName']}_post_crawl", Targets = catalog_target, Role=user_params['PassRole'], DependsOn={compaction_job: "SUCCEEDED"}, WaitForDependencies="AND", SchemaChangePolicy={"DeleteBehavior": "LOG"}) compaction_workflow = Workflow(Name=user_params['WorkflowName'], Entities=Entities(Jobs=[compaction_job], Crawlers=[compacted_files_crawler])) return compaction_workflow generated = generate_compaction_workflow(user_params=USER_PARAMS, system_params={}) gen_dict = generated.to_json() pprint(gen_dict)