

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Invocation DAGs dans différents environnements Amazon MWAA
<a name="samples-invoke-dag"></a>

L'exemple de code suivant crée un jeton d'interface de ligne de commande Apache Airflow. Le code utilise ensuite un graphe acyclique dirigé (DAG) dans un environnement Amazon MWAA pour appeler un DAG dans un autre environnement Amazon MWAA.

**Topics**
+ [Version](#samples-invoke-dag-version)
+ [Conditions préalables](#samples-invoke-dag-prereqs)
+ [Permissions](#samples-invoke-dag-permissions)
+ [Dépendances](#samples-invoke-dag-dependencies)
+ [Exemple de code](#samples-invoke-dag-code)

## Version
<a name="samples-invoke-dag-version"></a>

[Vous pouvez utiliser l'exemple de code présenté sur cette page avec **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) et **Apache Airflow v3** en Python 3.11.](https://peps.python.org/pep-0664/)

## Conditions préalables
<a name="samples-invoke-dag-prereqs"></a>

Pour utiliser l'exemple de code présenté sur cette page, vous avez besoin des éléments suivants :
+ Deux [environnements Amazon MWAA](get-started.md) avec accès au serveur Web du **réseau public**, y compris votre environnement actuel.
+ Un exemple de DAG chargé dans le bucket Amazon Simple Storage Service (Amazon S3) de votre environnement cible.

## Permissions
<a name="samples-invoke-dag-permissions"></a>

Pour utiliser l'exemple de code présenté sur cette page, le rôle d'exécution de votre environnement doit être autorisé à créer un jeton d'interface de ligne de commande Apache Airflow. Vous pouvez joindre la politique AWS-managed `AmazonMWAAAirflowCliAccess` pour accorder cette autorisation.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

Pour plus d’informations, consultez [Politique de la CLI Apache Airflow : Amazon MWAAAirflow CliAccess](access-policies.md#cli-access).

## Dépendances
<a name="samples-invoke-dag-dependencies"></a>

Pour utiliser cet exemple de code avec Apache Airflow v2 et versions ultérieures, aucune dépendance supplémentaire n'est requise. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)À utiliser pour installer Apache Airflow.

## Exemple de code
<a name="samples-invoke-dag-code"></a>

L'exemple de code suivant suppose que vous utilisez un DAG dans votre environnement actuel pour appeler un DAG dans un autre environnement.

1. Dans votre terminal, accédez au répertoire dans lequel votre code DAG est enregistré. Par exemple :

   ```
   cd dags
   ```

1. Copiez le contenu de l'exemple de code suivant et enregistrez-le localement sous le nom`invoke_dag.py`. Remplacez les valeurs suivantes par vos informations.
   + `your-new-environment-name`— Le nom de l'autre environnement dans lequel vous souhaitez appeler le DAG.
   + `your-target-dag-id`— L'ID du DAG dans l'autre environnement que vous souhaitez invoquer.

   ```
   from airflow.decorators import dag, task
   import boto3
   from datetime import datetime, timedelta
   import os, requests
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @task()
   def invoke_dag_task(**kwargs):
       client = boto3.client('mwaa')
       token = client.create_cli_token(Name='your-new-environment-name')
       url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
       body = 'dags trigger your-target-dag-id'
       headers = {
           'Authorization' : 'Bearer ' + token['CliToken'],
           'Content-Type': 'text/plain'
           }
       requests.post(url, data=body, headers=headers)
   
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=datetime(2022, 1, 1),
       dagrun_timeout=timedelta(minutes=60),
       catchup=False
       )
   def invoke_dag():
       t = invoke_dag_task()
   
   invoke_dag_test = invoke_dag()
   ```

1.  Exécutez la AWS CLI commande suivante pour copier le DAG dans le bucket de votre environnement, puis déclenchez le DAG à l'aide de l'interface utilisateur d'Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Si le DAG s'exécute correctement, vous obtiendrez un résultat similaire à ce qui suit dans les journaux des tâches pour`invoke_dag_task`.

   ```
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   Pour vérifier que votre DAG a bien été invoqué, accédez à l'interface utilisateur Apache Airflow de votre nouvel environnement, puis procédez comme suit :

   1. Sur la **DAGs**page, localisez votre nouveau DAG cible dans la liste des DAGs.

   1. Sous **Dernière exécution**, vérifiez l'horodatage de la dernière exécution du DAG. Cet horodatage doit correspondre étroitement à l'horodatage le plus récent de votre autre `invoke_dag` environnement.

   1. Sous **Tâches récentes**, vérifiez que la dernière exécution a été réussie.