Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Test d'un plan
Pendant que vous développez votre code, vous devez effectuer des tests locaux pour vérifier que la structure du flux de travail est correcte.
Les tests locaux ne génèrent pas de tâches, d'crawlers ou de déclencheurs AWS Glue. Au lieu de cela, vous exécutez le script de structure localement et utilisez la méthode to_json()
et validate()
pour imprimer des objets et trouver des erreurs. Ces méthodes sont disponibles dans les trois classes définies dans les bibliothèques.
Il existe deux façons de gérer les arguments user_params
et system_params
que AWS Glue passe à votre fonction de structure. Votre code de banc de test peut créer un dictionnaire d'exemples de valeurs de paramètre de modèle et le transmettre à la fonction de structure en tant qu'argument user_params
. Vous pouvez également supprimer les références à user_params
et remplacez-les par des chaînes codées en dur.
Si votre code utilise les propriétés region
et accountId
dans system_params
, vous pouvez passer votre propre dictionnaire pour system_params
.
Pour tester un modèle
-
Démarrez un interpréteur Python dans un répertoire avec les bibliothèques, ou chargez les fichiers du modèle et les bibliothèques fournies dans votre environnement de développement intégré (IDE) préféré.
-
Assurez-vous que votre code importe les bibliothèques fournies.
-
Ajouter du code à votre fonction de structure pour appeler
validate()
outo_json()
sur n'importe quelle entité ou sur l'objetWorkflow
. Par exemple, si votre code crée un objetCrawler
nommémycrawler
, vous pouvez appelervalidate()
comme suit.mycrawler.validate()
Vous pouvez imprimer
mycrawler
comme suit :print(mycrawler.to_json())
Si vous appelez
to_json
sur un objet, il n'est pas nécessaire d'appeler égalementvalidate()
, parce queto_json()
appellevalidate()
.Il est plus utile d'appeler ces méthodes sur l'objet de flux de travail. En supposant que votre script nomme l'objet de flux de travail
my_workflow
, validez et imprimez l'objet de flux de travail comme suit.print(my_workflow.to_json())
Pour plus d'informations sur
to_json()
etvalidate()
, consultez Méthodes de classe.Vous pouvez également importer
pprint
et imprimer l'objet de flux de travail, tel que décrit dans l'exemple plus loin dans cette section. -
Exécutez le code, corrigez les erreurs et supprimez enfin tous les appels à
validate()
outo_json()
.
L'exemple suivant montre comment construire un dictionnaire d'exemples de paramètres de modèle et le transmettre en tant qu'argument user_params
à la fonction de structure generate_compaction_workflow
. L'exemple montre également comment imprimer automatiquement l'objet de flux de travail généré.
from pprint import pprint from awsglue.blueprint.workflow import * from awsglue.blueprint.job import * from awsglue.blueprint.crawler import * USER_PARAMS = {"WorkflowName": "compaction_workflow", "ScriptLocation": "s3://awsexamplebucket1/scripts/threaded-compaction.py", "PassRole": "arn:aws:iam::111122223333:role/GlueRole-ETL", "DatabaseName": "cloudtrial", "TableName": "ct_cloudtrail", "CoalesceFactor": 4, "MaxThreadWorkers": 200} def generate_compaction_workflow(user_params: dict, system_params: dict) -> Workflow: compaction_job = Job(Name=f"{user_params['WorkflowName']}_etl_job", Command={"Name": "glueetl", "ScriptLocation": user_params['ScriptLocation'], "PythonVersion": "3"}, Role="arn:aws:iam::111122223333:role/AWSGlueServiceRoleDefault", DefaultArguments={"DatabaseName": user_params['DatabaseName'], "TableName": user_params['TableName'], "CoalesceFactor": user_params['CoalesceFactor'], "max_thread_workers": user_params['MaxThreadWorkers']}) catalog_target = {"CatalogTargets": [{"DatabaseName": user_params['DatabaseName'], "Tables": [user_params['TableName']]}]} compacted_files_crawler = Crawler(Name=f"{user_params['WorkflowName']}_post_crawl", Targets = catalog_target, Role=user_params['PassRole'], DependsOn={compaction_job: "SUCCEEDED"}, WaitForDependencies="AND", SchemaChangePolicy={"DeleteBehavior": "LOG"}) compaction_workflow = Workflow(Name=user_params['WorkflowName'], Entities=Entities(Jobs=[compaction_job], Crawlers=[compacted_files_crawler])) return compaction_workflow generated = generate_compaction_workflow(user_params=USER_PARAMS, system_params={}) gen_dict = generated.to_json() pprint(gen_dict)